Source code for tksn.adb.device

# -*- coding: utf-8 -*-
"""Implementation of tksn.adb.device.Device class

:copyright: (c) 2016 by tksn
:license: MIT
"""

from __future__ import unicode_literals
import io
import itertools
import re
import shutil
import sys
import tksn.adb
import tksn.adb.call
import tksn.adb.hardwarekey


[docs]class DeviceNotFoundError(Exception): """Device not found error Thrown when the device is not reachable. """
[docs]class AsyncResult(object): """Accessor to an asynchronous device call result AsyncResult provides an access to a previously started ADB process and its result. It includes, a way to wait for completion of the process, a way to kill the process, and a way to get the result after the process is completed or killed. """ def __init__(self, proc, save_as_func=None): """Initialization Usually not called from client code. """ self.__proc = proc self.__save_as_func = save_as_func
[docs] def wait(self, timeout=None, kill=False): """Wait for completion of the process If the process does not terminate after timeout seconds, raise a TimeoutExpired exception. Args: timeout (int): timeout in seconds. Infinite if None. kill (bool): True if kill the process immediately. Returns: AsyncResult: self object """ if kill: self.__proc.kill() self.__proc.wait(timeout=timeout) return self
[docs] def stop(self): """Stop the process immediately Equivalent to wait(kill=True) Returns: AsyncResult: self object """ return self.wait(kill=True)
[docs] def save_as(self, filepath): """Save result as a file Args: filepath (str): where to save the result data """ if self.__save_as_func: self.__save_as_func(filepath)
[docs]class KeyAccessor(object): """Hardware key accessor class KeyAccessor provides key down(press) and up(release) method. """ def __init__(self, hwkey, code): """Initialization Usually not called from client code. """ self.__hwkey = hwkey self.__code = code
[docs] def press(self, auto_release=True): """Press key Args: auto_release (bool): True to do press-and-release at once Returns: KeyAccessor: self object """ self.__hwkey.down(key=self.__code) if auto_release: self.release() return self
[docs] def release(self): """Release key Returns: KeyAccessor: self object """ self.__hwkey.up(key=self.__code) return self
[docs]class LogcatAccessor(object): """Logcat accessor class, which can be used to start and stop logcat and to retrieve its result.""" def __init__(self, call_sync, call_async): """Initialization Usually not called from client code. """ self.__call_sync = call_sync self.__call_async = call_async
[docs] def clear(self): """Clear the device's log buffer Returns: tuple: (stdout, stderr) pair from adb logcat command """ self.__call_sync(('logcat', '-c'))
[docs] def start(self, out=None, err=None, buffer=None, format=None, filterspec=None): """Invoke logcat Start logcat process and return immediately. You can wait for the completion, and can get the result of the logcat process by AsyncResult object which is returned by this method. Args: out (file): file object to receive stdout of logcat. Can be None if you retrieve stdout via AsyncResult.save_as. err (file): file object to receive stderr of logcat. Can be None if you don't want stderr. buffer (tuple): tuple contains name of buffers from which you get logs. Default is ('main', 'system', 'events', 'radio'). Name of a buffer is what you can secify with '-b' option of adb logcat command. format (str): format specifier. Default is 'time'. Format is what you can secify with '-v' option of adb logcat command. filterspec (tuple): FILTERSPEC to adb logcat command, if any. Returns: AsyncResult: AsyncResult object which can be used to wait completion and to obtain result """ buffer = buffer or ('main', 'system', 'events', 'radio') format = format or 'time' filterspec = filterspec or () args = itertools.chain( ('logcat', ), ('-v', format), itertools.chain.from_iterable(('-b', buf) for buf in buffer), filterspec) save_as_func = None if out is None: tempfilepath = tksn.adb.utils.get_tempfilepath() out = open(tempfilepath, 'wb') def save_as(path): out.close() shutil.move(tempfilepath, path) save_as_func = save_as proc = self.__call_async(args, out=out, err=err) async_result = AsyncResult(proc, save_as_func) return async_result
[docs]class ScreenrecordAccessor(object): """Screenrecord accessor class, which can be used to start and stop screenrecord and to retrieve its result. """ def __init__(self, pull, call_async): """Initialization Usually not called from client code. """ self.__pull = pull self.__call_async = call_async
[docs] def start(self, size=None, bitrate=1000000, rotate=False, filepath_in_device='/sdcard/screenrecord.mp4'): """Invoke screenrecord. Start screenrecord process and returns immediately. You can wait for the completion, and can get the result of the screenrecord process by AsyncResult object which is returned by this method. Args: size (tuple): (width, height) tuple which is passed to screenrecord with '--size' option. None if you don't want to specify '--size' option. Default is None. bitrate (int): bit rate which is passed to screenrecord with --bit-rate option. Default is 1000000. rotate (bool): True if use '--rotate' option of screenrecord. Default is False. filepath_in_device (str): File path to screen record (mp4) file in the device. Returns: AsyncResult: AsyncResult object which can be used to wait completion and to obtain result """ size = ('--size {}x{}'.format(*size),) if size else () bitrate = ('--bit-rate {}'.format(bitrate), ) rotate = ('--rotate', ) if rotate else () args = itertools.chain( ('shell', 'screenrecord'), size, bitrate, rotate, (filepath_in_device, )) def save_as(path): self.__pull(filepath_in_device, path) async_result = AsyncResult(self.__call_async(args), save_as_func=save_as) return async_result
[docs]class ScreencapAccessor(object): """Screencap accessor class, which can be used to start and stop screencap and to retrieve its result. """ def __init__(self, pull, call_async): """Initialization Usually not called from client code. """ self.__pull = pull self.__call_async = call_async
[docs] def start(self, filepath_in_device='/sdcard/screen.png'): """Invoke screencap Start screenrecord process and returns immediately. You can wait for the completion, and can get the result of the screencap process by AsyncResult object which is returned by this method. Args: filepath_in_device (str): File path to screen capture (png) file in the device. Returns: AsyncResult: AsyncResult object which can be used to wait completion and to obtain result """ async_result = AsyncResult( self.__call_async(('shell', 'screencap', filepath_in_device)), save_as_func=lambda p: self.__pull(filepath_in_device, p)) return async_result
[docs] def save_as(self, filepath): """Save screen capture as a file Equivalent to start().wait().save_as(filepath) Args: filepath (str): destination file path """ getattr(self.start().wait(), 'save_as')(filepath)
[docs] def as_bytes(self): """Get screen capture as bytes Returns: bytes: screencapture data. Useful when you want to create PIL.Image object via PIL.Image.frombytes() """ tempfilepath = tksn.adb.utils.get_tempfilepath() self.save_as(tempfilepath) with open(tempfilepath, 'rb') as f: return f.read()
[docs]class BugreportAccessor(object): """Bugreport accessor class, which can be used to start and stop bugreport and to retrieve its result. """ def __init__(self, call_async): """Initialization Usually not called from client code. """ self.__call_async = call_async
[docs] def start(self, out=None, err=None): """Invoke bugreport Start bugreport process and return immediately. You can wait for the completion, and can get the result of the bugreport process by AsyncResult object which is returned by this method. Args: out (file): file object to receive stdout of bugreport. Can be None if you retrieve stdout via AsyncResult.save_as. err (file): file object to receive stderr of bugreport. Can be None if you don't want stderr. Returns: AsyncResult: AsyncResult object which can be used to wait completion and to obtain result """ save_as_func = None if out is None: tempfilepath = tksn.adb.utils.get_tempfilepath() out = open(tempfilepath, 'wb') def save_as(path): out.close() shutil.move(tempfilepath, path) save_as_func = save_as proc = self.__call_async(('bugreport', ), out=out, err=err) async_result = AsyncResult(proc, save_as_func=save_as_func) return async_result
[docs]class Device(object): """Android device class Device class provides various methods which wrap ADB commands, such as logcat, screencap, screenrecord, etc. """ def __init__(self, serialno=None): """Initialization Args: serialno (str): Serial number of the Android device. The first device found by 'adb devices' is used if serialno=None. """ self.__serialno = serialno or _get_serialno()[0] @property def serialno(self): """Serial number of the Android device Returns: str: Serial number """ return self.__serialno def __check_device_exists(self): if self.serialno not in _get_serialno(): raise DeviceNotFoundError('Device disconnected') def __call_adb(self, args): self.__check_device_exists() adb = tksn.adb.call.get_adb_executable() command = (adb, '-s', self.__serialno) + tuple(args) out, err = tksn.adb.call.call_sync(command) return ( out.decode(sys.getdefaultencoding()), err.decode(sys.getdefaultencoding())) def __call_async_adb(self, args, out=None, err=None): self.__check_device_exists() adb = tksn.adb.call.get_adb_executable() command = (adb, '-s', self.__serialno) + tuple(args) return tksn.adb.call.call_async(command, out=out, err=err) def __get_iphonesubinfo(self, code): out, _ = self.__call_adb(('shell', 'service', 'call', 'iphonesubinfo', str(code))) return _parse_iphonesubinfo(out) @property def imei(self): """IMEI property Valid only if the device is capable of connecting to cellular network. Returns: str: The devices's IMEI """ return self.__get_iphonesubinfo(1) @property def phone_number(self): """Phone number property Valid only if the device is a phone, and is activated on the cellular network. Returns: str: The devices's phone number """ return self.__get_iphonesubinfo(13)
[docs] def key(self, code): """Get hardware key accessor Args: code (int or str): key event code, or, event name such as 'KEY_VOLUMEDOWN' Returns: KeyAccessor: Key accessor object which provides up/down method. """ hwkey = tksn.adb.hardwarekey.HardwareKey(self.__call_adb) return KeyAccessor(hwkey, code)
[docs] def push(self, from_path, to_path): """Push a file to the Android device Args: from_path (str): source path to_path (str): destination path on the device Returns: tuple: (stdout, stderr) pair from adb push command """ args = ('push', from_path, to_path) return self.__call_adb(args)
[docs] def pull(self, from_path, to_path): """Pull a file from the Android device Args: from_path (str): source path on the deivce to_path (str): destination path Returns: tuple: (stdout, stderr) pair from adb pull command """ args = ('pull', from_path, to_path) return self.__call_adb(args)
@property def logcat(self): """Logcat accessor Returns: LogcatAccessor: logcat accessor object """ return LogcatAccessor(self.__call_adb, self.__call_async_adb) @property def screenrecord(self): """Screenrecord accessor Returns: ScreenrecordAccessor: screenrecord accessor object """ return ScreenrecordAccessor(self.pull, self.__call_async_adb) @property def screencap(self): """Screencap accessor Returns: ScreencapAccessor: screencap accessor object """ return ScreencapAccessor(self.pull, self.__call_async_adb) @property def bugreport(self): """Bugreport accessor Returns: BugreportAccessor: bugreport accessor object """ return BugreportAccessor(self.__call_async_adb)
def _get_serialno(): serial = tksn.adb.get_device_serials() if len(serial) == 0: raise DeviceNotFoundError('No device found') return serial def _parse_iphonesubinfo(subinfo_str): def get_data_iter(): pattern = re.compile(r'[0-9a-f]{8}$') for line in io.StringIO(subinfo_str).readlines(): for data in line.split(): if not pattern.match(data): continue yield int(data[4:], base=16) yield int(data[:4], base=16) data_iter = get_data_iter() for i in range(4): next(data_iter) return ''.join((chr(data) for data in data_iter if 0 < data < 256))