# -*- coding: utf-8 -*-
"""Implementation of tksn.adb.device.Device class
:copyright: (c) 2016 by tksn
:license: MIT
"""
from __future__ import unicode_literals
import io
import itertools
import re
import shutil
import sys
import tksn.adb
import tksn.adb.call
import tksn.adb.hardwarekey
[docs]class DeviceNotFoundError(Exception):
"""Device not found error
Thrown when the device is not reachable.
"""
[docs]class AsyncResult(object):
"""Accessor to an asynchronous device call result
AsyncResult provides an access to a previously started ADB process and its result.
It includes, a way to wait for completion of the process,
a way to kill the process,
and a way to get the result after the process is completed or killed.
"""
def __init__(self, proc, save_as_func=None):
"""Initialization
Usually not called from client code.
"""
self.__proc = proc
self.__save_as_func = save_as_func
[docs] def wait(self, timeout=None, kill=False):
"""Wait for completion of the process
If the process does not terminate after timeout seconds, raise a TimeoutExpired exception.
Args:
timeout (int): timeout in seconds. Infinite if None.
kill (bool): True if kill the process immediately.
Returns:
AsyncResult: self object
"""
if kill:
self.__proc.kill()
self.__proc.wait(timeout=timeout)
return self
[docs] def stop(self):
"""Stop the process immediately
Equivalent to wait(kill=True)
Returns:
AsyncResult: self object
"""
return self.wait(kill=True)
[docs] def save_as(self, filepath):
"""Save result as a file
Args:
filepath (str): where to save the result data
"""
if self.__save_as_func:
self.__save_as_func(filepath)
[docs]class KeyAccessor(object):
"""Hardware key accessor class
KeyAccessor provides key down(press) and up(release) method.
"""
def __init__(self, hwkey, code):
"""Initialization
Usually not called from client code.
"""
self.__hwkey = hwkey
self.__code = code
[docs] def press(self, auto_release=True):
"""Press key
Args:
auto_release (bool): True to do press-and-release at once
Returns:
KeyAccessor: self object
"""
self.__hwkey.down(key=self.__code)
if auto_release:
self.release()
return self
[docs] def release(self):
"""Release key
Returns:
KeyAccessor: self object
"""
self.__hwkey.up(key=self.__code)
return self
[docs]class LogcatAccessor(object):
"""Logcat accessor class, which can be used to start
and stop logcat and to retrieve its result."""
def __init__(self, call_sync, call_async):
"""Initialization
Usually not called from client code.
"""
self.__call_sync = call_sync
self.__call_async = call_async
[docs] def clear(self):
"""Clear the device's log buffer
Returns:
tuple: (stdout, stderr) pair from adb logcat command
"""
self.__call_sync(('logcat', '-c'))
[docs] def start(self, out=None, err=None, buffer=None, format=None, filterspec=None):
"""Invoke logcat
Start logcat process and return immediately.
You can wait for the completion, and can get the result of the logcat process
by AsyncResult object which is returned by this method.
Args:
out (file): file object to receive stdout of logcat.
Can be None if you retrieve stdout via AsyncResult.save_as.
err (file): file object to receive stderr of logcat.
Can be None if you don't want stderr.
buffer (tuple): tuple contains name of buffers from which you get logs.
Default is ('main', 'system', 'events', 'radio').
Name of a buffer is what you can secify with '-b' option
of adb logcat command.
format (str): format specifier.
Default is 'time'.
Format is what you can secify with '-v' option of adb logcat command.
filterspec (tuple):
FILTERSPEC to adb logcat command, if any.
Returns:
AsyncResult: AsyncResult object
which can be used to wait completion and to obtain result
"""
buffer = buffer or ('main', 'system', 'events', 'radio')
format = format or 'time'
filterspec = filterspec or ()
args = itertools.chain(
('logcat', ),
('-v', format),
itertools.chain.from_iterable(('-b', buf) for buf in buffer),
filterspec)
save_as_func = None
if out is None:
tempfilepath = tksn.adb.utils.get_tempfilepath()
out = open(tempfilepath, 'wb')
def save_as(path):
out.close()
shutil.move(tempfilepath, path)
save_as_func = save_as
proc = self.__call_async(args, out=out, err=err)
async_result = AsyncResult(proc, save_as_func)
return async_result
[docs]class ScreenrecordAccessor(object):
"""Screenrecord accessor class, which can be used to start and
stop screenrecord and to retrieve its result.
"""
def __init__(self, pull, call_async):
"""Initialization
Usually not called from client code.
"""
self.__pull = pull
self.__call_async = call_async
[docs] def start(self, size=None, bitrate=1000000, rotate=False,
filepath_in_device='/sdcard/screenrecord.mp4'):
"""Invoke screenrecord.
Start screenrecord process and returns immediately.
You can wait for the completion, and can get the result of the screenrecord process
by AsyncResult object which is returned by this method.
Args:
size (tuple): (width, height) tuple
which is passed to screenrecord with '--size' option.
None if you don't want to specify '--size' option.
Default is None.
bitrate (int): bit rate which is passed to screenrecord with --bit-rate option.
Default is 1000000.
rotate (bool): True if use '--rotate' option of screenrecord.
Default is False.
filepath_in_device (str): File path to screen record (mp4) file in the device.
Returns:
AsyncResult: AsyncResult object
which can be used to wait completion and to obtain result
"""
size = ('--size {}x{}'.format(*size),) if size else ()
bitrate = ('--bit-rate {}'.format(bitrate), )
rotate = ('--rotate', ) if rotate else ()
args = itertools.chain(
('shell', 'screenrecord'), size, bitrate, rotate, (filepath_in_device, ))
def save_as(path):
self.__pull(filepath_in_device, path)
async_result = AsyncResult(self.__call_async(args), save_as_func=save_as)
return async_result
[docs]class ScreencapAccessor(object):
"""Screencap accessor class, which can be used to start and
stop screencap and to retrieve its result.
"""
def __init__(self, pull, call_async):
"""Initialization
Usually not called from client code.
"""
self.__pull = pull
self.__call_async = call_async
[docs] def start(self, filepath_in_device='/sdcard/screen.png'):
"""Invoke screencap
Start screenrecord process and returns immediately.
You can wait for the completion, and can get the result of the screencap process
by AsyncResult object which is returned by this method.
Args:
filepath_in_device (str): File path to screen capture (png) file in the device.
Returns:
AsyncResult: AsyncResult object
which can be used to wait completion and to obtain result
"""
async_result = AsyncResult(
self.__call_async(('shell', 'screencap', filepath_in_device)),
save_as_func=lambda p: self.__pull(filepath_in_device, p))
return async_result
[docs] def save_as(self, filepath):
"""Save screen capture as a file
Equivalent to start().wait().save_as(filepath)
Args:
filepath (str): destination file path
"""
getattr(self.start().wait(), 'save_as')(filepath)
[docs] def as_bytes(self):
"""Get screen capture as bytes
Returns:
bytes: screencapture data.
Useful when you want to create PIL.Image object via PIL.Image.frombytes()
"""
tempfilepath = tksn.adb.utils.get_tempfilepath()
self.save_as(tempfilepath)
with open(tempfilepath, 'rb') as f:
return f.read()
[docs]class BugreportAccessor(object):
"""Bugreport accessor class, which can be used to start and
stop bugreport and to retrieve its result.
"""
def __init__(self, call_async):
"""Initialization
Usually not called from client code.
"""
self.__call_async = call_async
[docs] def start(self, out=None, err=None):
"""Invoke bugreport
Start bugreport process and return immediately.
You can wait for the completion, and can get the result of the bugreport process
by AsyncResult object which is returned by this method.
Args:
out (file): file object to receive stdout of bugreport.
Can be None if you retrieve stdout via AsyncResult.save_as.
err (file): file object to receive stderr of bugreport.
Can be None if you don't want stderr.
Returns:
AsyncResult: AsyncResult object
which can be used to wait completion and to obtain result
"""
save_as_func = None
if out is None:
tempfilepath = tksn.adb.utils.get_tempfilepath()
out = open(tempfilepath, 'wb')
def save_as(path):
out.close()
shutil.move(tempfilepath, path)
save_as_func = save_as
proc = self.__call_async(('bugreport', ), out=out, err=err)
async_result = AsyncResult(proc, save_as_func=save_as_func)
return async_result
[docs]class Device(object):
"""Android device class
Device class provides various methods which wrap ADB commands,
such as logcat, screencap, screenrecord, etc.
"""
def __init__(self, serialno=None):
"""Initialization
Args:
serialno (str): Serial number of the Android device.
The first device found by 'adb devices' is used if serialno=None.
"""
self.__serialno = serialno or _get_serialno()[0]
@property
def serialno(self):
"""Serial number of the Android device
Returns:
str: Serial number
"""
return self.__serialno
def __check_device_exists(self):
if self.serialno not in _get_serialno():
raise DeviceNotFoundError('Device disconnected')
def __call_adb(self, args):
self.__check_device_exists()
adb = tksn.adb.call.get_adb_executable()
command = (adb, '-s', self.__serialno) + tuple(args)
out, err = tksn.adb.call.call_sync(command)
return (
out.decode(sys.getdefaultencoding()),
err.decode(sys.getdefaultencoding()))
def __call_async_adb(self, args, out=None, err=None):
self.__check_device_exists()
adb = tksn.adb.call.get_adb_executable()
command = (adb, '-s', self.__serialno) + tuple(args)
return tksn.adb.call.call_async(command, out=out, err=err)
def __get_iphonesubinfo(self, code):
out, _ = self.__call_adb(('shell', 'service', 'call', 'iphonesubinfo', str(code)))
return _parse_iphonesubinfo(out)
@property
def imei(self):
"""IMEI property
Valid only if the device is capable of connecting to cellular network.
Returns:
str: The devices's IMEI
"""
return self.__get_iphonesubinfo(1)
@property
def phone_number(self):
"""Phone number property
Valid only if the device is a phone, and is activated on the cellular network.
Returns:
str: The devices's phone number
"""
return self.__get_iphonesubinfo(13)
[docs] def key(self, code):
"""Get hardware key accessor
Args:
code (int or str): key event code, or, event name such as 'KEY_VOLUMEDOWN'
Returns:
KeyAccessor: Key accessor object which provides up/down method.
"""
hwkey = tksn.adb.hardwarekey.HardwareKey(self.__call_adb)
return KeyAccessor(hwkey, code)
[docs] def push(self, from_path, to_path):
"""Push a file to the Android device
Args:
from_path (str): source path
to_path (str): destination path on the device
Returns:
tuple: (stdout, stderr) pair from adb push command
"""
args = ('push', from_path, to_path)
return self.__call_adb(args)
[docs] def pull(self, from_path, to_path):
"""Pull a file from the Android device
Args:
from_path (str): source path on the deivce
to_path (str): destination path
Returns:
tuple: (stdout, stderr) pair from adb pull command
"""
args = ('pull', from_path, to_path)
return self.__call_adb(args)
@property
def logcat(self):
"""Logcat accessor
Returns:
LogcatAccessor: logcat accessor object
"""
return LogcatAccessor(self.__call_adb, self.__call_async_adb)
@property
def screenrecord(self):
"""Screenrecord accessor
Returns:
ScreenrecordAccessor: screenrecord accessor object
"""
return ScreenrecordAccessor(self.pull, self.__call_async_adb)
@property
def screencap(self):
"""Screencap accessor
Returns:
ScreencapAccessor: screencap accessor object
"""
return ScreencapAccessor(self.pull, self.__call_async_adb)
@property
def bugreport(self):
"""Bugreport accessor
Returns:
BugreportAccessor: bugreport accessor object
"""
return BugreportAccessor(self.__call_async_adb)
def _get_serialno():
serial = tksn.adb.get_device_serials()
if len(serial) == 0:
raise DeviceNotFoundError('No device found')
return serial
def _parse_iphonesubinfo(subinfo_str):
def get_data_iter():
pattern = re.compile(r'[0-9a-f]{8}$')
for line in io.StringIO(subinfo_str).readlines():
for data in line.split():
if not pattern.match(data):
continue
yield int(data[4:], base=16)
yield int(data[:4], base=16)
data_iter = get_data_iter()
for i in range(4):
next(data_iter)
return ''.join((chr(data) for data in data_iter if 0 < data < 256))