diff --git a/adb/__init__.py b/adb/__init__.py deleted file mode 100644 index 6b509c643..000000000 --- a/adb/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -# -# Copyright (C) 2015 The Android Open Source Project -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import absolute_import -from .device import * # pylint: disable=wildcard-import diff --git a/adb/device.py b/adb/device.py deleted file mode 100644 index 516e88003..000000000 --- a/adb/device.py +++ /dev/null @@ -1,339 +0,0 @@ -# -# Copyright (C) 2015 The Android Open Source Project -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -import logging -import os -import re -import subprocess -import tempfile - - -class FindDeviceError(RuntimeError): - pass - - -class DeviceNotFoundError(FindDeviceError): - def __init__(self, serial): - self.serial = serial - super(DeviceNotFoundError, self).__init__( - 'No device with serial {}'.format(serial)) - - -class NoUniqueDeviceError(FindDeviceError): - def __init__(self): - super(NoUniqueDeviceError, self).__init__('No unique device') - - -class ShellError(RuntimeError): - def __init__(self, cmd, stdout, stderr, exit_code): - super(ShellError, self).__init__( - '`{0}` exited with code {1}'.format(cmd, exit_code)) - self.cmd = cmd - self.stdout = stdout - self.stderr = stderr - self.exit_code = exit_code - - -def get_devices(): - with open(os.devnull, 'wb') as devnull: - subprocess.check_call(['adb', 'start-server'], stdout=devnull, - stderr=devnull) - out = subprocess.check_output(['adb', 'devices']).splitlines() - - # The first line of `adb devices` just says "List of attached devices", so - # skip that. - devices = [] - for line in out[1:]: - if not line.strip(): - continue - if 'offline' in line: - continue - - serial, _ = re.split(r'\s+', line, maxsplit=1) - devices.append(serial) - return devices - - -def _get_unique_device(product=None): - devices = get_devices() - if len(devices) != 1: - raise NoUniqueDeviceError() - return AndroidDevice(devices[0], product) - - -def _get_device_by_serial(serial, product=None): - for device in get_devices(): - if device == serial: - return AndroidDevice(serial, product) - raise DeviceNotFoundError(serial) - - -def get_device(serial=None, product=None): - """Get a uniquely identified AndroidDevice if one is available. - - Raises: - DeviceNotFoundError: - The serial specified by `serial` or $ANDROID_SERIAL is not - connected. - - NoUniqueDeviceError: - Neither `serial` nor $ANDROID_SERIAL was set, and the number of - devices connected to the system is not 1. Having 0 connected - devices will also result in this error. - - Returns: - An AndroidDevice associated with the first non-None identifier in the - following order of preference: - - 1) The `serial` argument. - 2) The environment variable $ANDROID_SERIAL. - 3) The single device connnected to the system. - """ - if serial is not None: - return _get_device_by_serial(serial, product) - - android_serial = os.getenv('ANDROID_SERIAL') - if android_serial is not None: - return _get_device_by_serial(android_serial, product) - - return _get_unique_device(product) - -# Call this instead of subprocess.check_output() to work-around issue in Python -# 2's subprocess class on Windows where it doesn't support Unicode. This -# writes the command line to a UTF-8 batch file that is properly interpreted -# by cmd.exe. -def _subprocess_check_output(*popenargs, **kwargs): - # Only do this slow work-around if Unicode is in the cmd line. - if (os.name == 'nt' and - any(isinstance(arg, unicode) for arg in popenargs[0])): - # cmd.exe requires a suffix to know that it is running a batch file - tf = tempfile.NamedTemporaryFile('wb', suffix='.cmd', delete=False) - # @ in batch suppresses echo of the current line. - # Change the codepage to 65001, the UTF-8 codepage. - tf.write('@chcp 65001 > nul\r\n') - tf.write('@') - # Properly quote all the arguments and encode in UTF-8. - tf.write(subprocess.list2cmdline(popenargs[0]).encode('utf-8')) - tf.close() - - try: - result = subprocess.check_output(['cmd.exe', '/c', tf.name], - **kwargs) - except subprocess.CalledProcessError as e: - # Show real command line instead of the cmd.exe command line. - raise subprocess.CalledProcessError(e.returncode, popenargs[0], - output=e.output) - finally: - os.remove(tf.name) - return result - else: - return subprocess.check_output(*popenargs, **kwargs) - -class AndroidDevice(object): - # Delimiter string to indicate the start of the exit code. - _RETURN_CODE_DELIMITER = 'x' - - # Follow any shell command with this string to get the exit - # status of a program since this isn't propagated by adb. - # - # The delimiter is needed because `printf 1; echo $?` would print - # "10", and we wouldn't be able to distinguish the exit code. - _RETURN_CODE_PROBE_STRING = 'echo "{0}$?"'.format(_RETURN_CODE_DELIMITER) - - # Maximum search distance from the output end to find the delimiter. - # adb on Windows returns \r\n even if adbd returns \n. - _RETURN_CODE_SEARCH_LENGTH = len('{0}255\r\n'.format(_RETURN_CODE_DELIMITER)) - - # Shell protocol feature string. - SHELL_PROTOCOL_FEATURE = 'shell_2' - - def __init__(self, serial, product=None): - self.serial = serial - self.product = product - self.adb_cmd = ['adb'] - if self.serial is not None: - self.adb_cmd.extend(['-s', serial]) - if self.product is not None: - self.adb_cmd.extend(['-p', product]) - self._linesep = None - self._features = None - - @property - def linesep(self): - if self._linesep is None: - self._linesep = subprocess.check_output(self.adb_cmd + - ['shell', 'echo']) - return self._linesep - - @property - def features(self): - if self._features is None: - try: - self._features = self._simple_call(['features']).splitlines() - except subprocess.CalledProcessError: - self._features = [] - return self._features - - def _make_shell_cmd(self, user_cmd): - command = self.adb_cmd + ['shell'] + user_cmd - if self.SHELL_PROTOCOL_FEATURE not in self.features: - command.append('; ' + self._RETURN_CODE_PROBE_STRING) - return command - - def _parse_shell_output(self, out): - """Finds the exit code string from shell output. - - Args: - out: Shell output string. - - Returns: - An (exit_code, output_string) tuple. The output string is - cleaned of any additional stuff we appended to find the - exit code. - - Raises: - RuntimeError: Could not find the exit code in |out|. - """ - search_text = out - if len(search_text) > self._RETURN_CODE_SEARCH_LENGTH: - # We don't want to search over massive amounts of data when we know - # the part we want is right at the end. - search_text = search_text[-self._RETURN_CODE_SEARCH_LENGTH:] - partition = search_text.rpartition(self._RETURN_CODE_DELIMITER) - if partition[1] == '': - raise RuntimeError('Could not find exit status in shell output.') - result = int(partition[2]) - # partition[0] won't contain the full text if search_text was truncated, - # pull from the original string instead. - out = out[:-len(partition[1]) - len(partition[2])] - return result, out - - def _simple_call(self, cmd): - logging.info(' '.join(self.adb_cmd + cmd)) - return _subprocess_check_output( - self.adb_cmd + cmd, stderr=subprocess.STDOUT) - - def shell(self, cmd): - """Calls `adb shell` - - Args: - cmd: string shell command to execute. - - Returns: - A (stdout, stderr) tuple. Stderr may be combined into stdout - if the device doesn't support separate streams. - - Raises: - ShellError: the exit code was non-zero. - """ - exit_code, stdout, stderr = self.shell_nocheck(cmd) - if exit_code != 0: - raise ShellError(cmd, stdout, stderr, exit_code) - return stdout, stderr - - def shell_nocheck(self, cmd): - """Calls `adb shell` - - Args: - cmd: string shell command to execute. - - Returns: - An (exit_code, stdout, stderr) tuple. Stderr may be combined - into stdout if the device doesn't support separate streams. - """ - cmd = self._make_shell_cmd(cmd) - logging.info(' '.join(cmd)) - p = subprocess.Popen( - cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE) - stdout, stderr = p.communicate() - if self.SHELL_PROTOCOL_FEATURE in self.features: - exit_code = p.returncode - else: - exit_code, stdout = self._parse_shell_output(stdout) - return exit_code, stdout, stderr - - def install(self, filename, replace=False): - cmd = ['install'] - if replace: - cmd.append('-r') - cmd.append(filename) - return self._simple_call(cmd) - - def push(self, local, remote): - return self._simple_call(['push', local, remote]) - - def pull(self, remote, local): - return self._simple_call(['pull', remote, local]) - - def sync(self, directory=None): - cmd = ['sync'] - if directory is not None: - cmd.append(directory) - return self._simple_call(cmd) - - def forward(self, local, remote): - return self._simple_call(['forward', local, remote]) - - def tcpip(self, port): - return self._simple_call(['tcpip', port]) - - def usb(self): - return self._simple_call(['usb']) - - def reboot(self): - return self._simple_call(['reboot']) - - def root(self): - return self._simple_call(['root']) - - def unroot(self): - return self._simple_call(['unroot']) - - def forward_remove(self, local): - return self._simple_call(['forward', '--remove', local]) - - def forward_remove_all(self): - return self._simple_call(['forward', '--remove-all']) - - def connect(self, host): - return self._simple_call(['connect', host]) - - def disconnect(self, host): - return self._simple_call(['disconnect', host]) - - def reverse(self, remote, local): - return self._simple_call(['reverse', remote, local]) - - def reverse_remove_all(self): - return self._simple_call(['reverse', '--remove-all']) - - def reverse_remove(self, remote): - return self._simple_call(['reverse', '--remove', remote]) - - def wait(self): - return self._simple_call(['wait-for-device']) - - def get_prop(self, prop_name): - output = self.shell(['getprop', prop_name])[0].splitlines() - if len(output) != 1: - raise RuntimeError('Too many lines in getprop output:\n' + - '\n'.join(output)) - value = output[0] - if not value.strip(): - return None - return value - - def set_prop(self, prop_name, value): - self.shell(['setprop', prop_name, value]) diff --git a/adb/test_device.py b/adb/test_device.py deleted file mode 100644 index d033a019b..000000000 --- a/adb/test_device.py +++ /dev/null @@ -1,537 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- -# -# Copyright (C) 2015 The Android Open Source Project -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -from __future__ import print_function - -import hashlib -import os -import posixpath -import random -import shlex -import shutil -import signal -import subprocess -import tempfile -import unittest - -import mock - -import adb - - -def requires_root(func): - def wrapper(self, *args): - if self.device.get_prop('ro.debuggable') != '1': - raise unittest.SkipTest('requires rootable build') - - was_root = self.device.shell(['id', '-un'])[0].strip() == 'root' - if not was_root: - self.device.root() - self.device.wait() - - try: - func(self, *args) - finally: - if not was_root: - self.device.unroot() - self.device.wait() - - return wrapper - - -class GetDeviceTest(unittest.TestCase): - def setUp(self): - self.android_serial = os.getenv('ANDROID_SERIAL') - if 'ANDROID_SERIAL' in os.environ: - del os.environ['ANDROID_SERIAL'] - - def tearDown(self): - if self.android_serial is not None: - os.environ['ANDROID_SERIAL'] = self.android_serial - else: - if 'ANDROID_SERIAL' in os.environ: - del os.environ['ANDROID_SERIAL'] - - @mock.patch('adb.device.get_devices') - def test_explicit(self, mock_get_devices): - mock_get_devices.return_value = ['foo', 'bar'] - device = adb.get_device('foo') - self.assertEqual(device.serial, 'foo') - - @mock.patch('adb.device.get_devices') - def test_from_env(self, mock_get_devices): - mock_get_devices.return_value = ['foo', 'bar'] - os.environ['ANDROID_SERIAL'] = 'foo' - device = adb.get_device() - self.assertEqual(device.serial, 'foo') - - @mock.patch('adb.device.get_devices') - def test_arg_beats_env(self, mock_get_devices): - mock_get_devices.return_value = ['foo', 'bar'] - os.environ['ANDROID_SERIAL'] = 'bar' - device = adb.get_device('foo') - self.assertEqual(device.serial, 'foo') - - @mock.patch('adb.device.get_devices') - def test_no_such_device(self, mock_get_devices): - mock_get_devices.return_value = ['foo', 'bar'] - self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) - - os.environ['ANDROID_SERIAL'] = 'baz' - self.assertRaises(adb.DeviceNotFoundError, adb.get_device) - - @mock.patch('adb.device.get_devices') - def test_unique_device(self, mock_get_devices): - mock_get_devices.return_value = ['foo'] - device = adb.get_device() - self.assertEqual(device.serial, 'foo') - - @mock.patch('adb.device.get_devices') - def test_no_unique_device(self, mock_get_devices): - mock_get_devices.return_value = ['foo', 'bar'] - self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) - - -class DeviceTest(unittest.TestCase): - def setUp(self): - self.device = adb.get_device() - - -class ShellTest(DeviceTest): - def test_cat(self): - """Check that we can at least cat a file.""" - out = self.device.shell(['cat', '/proc/uptime'])[0].strip() - elements = out.split() - self.assertEqual(len(elements), 2) - - uptime, idle = elements - self.assertGreater(float(uptime), 0.0) - self.assertGreater(float(idle), 0.0) - - def test_throws_on_failure(self): - self.assertRaises(adb.ShellError, self.device.shell, ['false']) - - def test_output_not_stripped(self): - out = self.device.shell(['echo', 'foo'])[0] - self.assertEqual(out, 'foo' + self.device.linesep) - - def test_shell_nocheck_failure(self): - rc, out, _ = self.device.shell_nocheck(['false']) - self.assertNotEqual(rc, 0) - self.assertEqual(out, '') - - def test_shell_nocheck_output_not_stripped(self): - rc, out, _ = self.device.shell_nocheck(['echo', 'foo']) - self.assertEqual(rc, 0) - self.assertEqual(out, 'foo' + self.device.linesep) - - def test_can_distinguish_tricky_results(self): - # If result checking on ADB shell is naively implemented as - # `adb shell ; echo $?`, we would be unable to distinguish the - # output from the result for a cmd of `echo -n 1`. - rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1']) - self.assertEqual(rc, 0) - self.assertEqual(out, '1') - - def test_line_endings(self): - """Ensure that line ending translation is not happening in the pty. - - Bug: http://b/19735063 - """ - output = self.device.shell(['uname'])[0] - self.assertEqual(output, 'Linux' + self.device.linesep) - - def test_pty_logic(self): - """Verify PTY logic for shells. - - Interactive shells should use a PTY, non-interactive should not. - - Bug: http://b/21215503 - """ - proc = subprocess.Popen( - self.device.adb_cmd + ['shell'], stdin=subprocess.PIPE, - stdout=subprocess.PIPE, stderr=subprocess.STDOUT) - # [ -t 0 ] is used (rather than `tty`) to provide portability. This - # gives an exit code of 0 iff stdin is connected to a terminal. - # - # Closing host-side stdin doesn't currently trigger the interactive - # shell to exit so we need to explicitly add an exit command to - # close the session from the device side, and append \n to complete - # the interactive command. - result = proc.communicate('[ -t 0 ]; echo x$?; exit 0\n')[0] - partition = result.rpartition('x') - self.assertEqual(partition[1], 'x') - self.assertEqual(int(partition[2]), 0) - - exit_code = self.device.shell_nocheck(['[ -t 0 ]'])[0] - self.assertEqual(exit_code, 1) - - def test_shell_protocol(self): - """Tests the shell protocol on the device. - - If the device supports shell protocol, this gives us the ability - to separate stdout/stderr and return the exit code directly. - - Bug: http://b/19734861 - """ - if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: - raise unittest.SkipTest('shell protocol unsupported on this device') - result = self.device.shell_nocheck( - shlex.split('echo foo; echo bar >&2; exit 17')) - - self.assertEqual(17, result[0]) - self.assertEqual('foo' + self.device.linesep, result[1]) - self.assertEqual('bar' + self.device.linesep, result[2]) - - def test_non_interactive_sigint(self): - """Tests that SIGINT in a non-interactive shell kills the process. - - This requires the shell protocol in order to detect the broken - pipe; raw data transfer mode will only see the break once the - subprocess tries to read or write. - - Bug: http://b/23825725 - """ - if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features: - raise unittest.SkipTest('shell protocol unsupported on this device') - - # Start a long-running process. - sleep_proc = subprocess.Popen( - self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'), - stdin=subprocess.PIPE, stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - remote_pid = sleep_proc.stdout.readline().strip() - self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early') - proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid)) - - # Verify that the process is running, send signal, verify it stopped. - self.device.shell(proc_query) - os.kill(sleep_proc.pid, signal.SIGINT) - sleep_proc.communicate() - self.assertEqual(1, self.device.shell_nocheck(proc_query)[0], - 'subprocess failed to terminate') - - -class ArgumentEscapingTest(DeviceTest): - def test_shell_escaping(self): - """Make sure that argument escaping is somewhat sane.""" - - # http://b/19734868 - # Note that this actually matches ssh(1)'s behavior --- it's - # converted to `sh -c echo hello; echo world` which sh interprets - # as `sh -c echo` (with an argument to that shell of "hello"), - # and then `echo world` back in the first shell. - result = self.device.shell( - shlex.split("sh -c 'echo hello; echo world'"))[0] - result = result.splitlines() - self.assertEqual(['', 'world'], result) - # If you really wanted "hello" and "world", here's what you'd do: - result = self.device.shell( - shlex.split(r'echo hello\;echo world'))[0].splitlines() - self.assertEqual(['hello', 'world'], result) - - # http://b/15479704 - result = self.device.shell(shlex.split("'true && echo t'"))[0].strip() - self.assertEqual('t', result) - result = self.device.shell( - shlex.split("sh -c 'true && echo t'"))[0].strip() - self.assertEqual('t', result) - - # http://b/20564385 - result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip() - self.assertEqual('t', result) - result = self.device.shell( - shlex.split(r'echo -n 123\;uname'))[0].strip() - self.assertEqual('123Linux', result) - - def test_install_argument_escaping(self): - """Make sure that install argument escaping works.""" - # http://b/20323053 - tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk', - delete=False) - tf.close() - self.assertIn("-text;ls;1.apk", self.device.install(tf.name)) - os.remove(tf.name) - - # http://b/3090932 - tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk", - delete=False) - tf.close() - self.assertIn("-Live Hold'em.apk", self.device.install(tf.name)) - os.remove(tf.name) - - -class RootUnrootTest(DeviceTest): - def _test_root(self): - message = self.device.root() - if 'adbd cannot run as root in production builds' in message: - return - self.device.wait() - self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip()) - - def _test_unroot(self): - self.device.unroot() - self.device.wait() - self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip()) - - def test_root_unroot(self): - """Make sure that adb root and adb unroot work, using id(1).""" - if self.device.get_prop('ro.debuggable') != '1': - raise unittest.SkipTest('requires rootable build') - - original_user = self.device.shell(['id', '-un'])[0].strip() - try: - if original_user == 'root': - self._test_unroot() - self._test_root() - elif original_user == 'shell': - self._test_root() - self._test_unroot() - finally: - if original_user == 'root': - self.device.root() - else: - self.device.unroot() - self.device.wait() - - -class TcpIpTest(DeviceTest): - def test_tcpip_failure_raises(self): - """adb tcpip requires a port. - - Bug: http://b/22636927 - """ - self.assertRaises( - subprocess.CalledProcessError, self.device.tcpip, '') - self.assertRaises( - subprocess.CalledProcessError, self.device.tcpip, 'foo') - - -class SystemPropertiesTest(DeviceTest): - def test_get_prop(self): - self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running') - - @requires_root - def test_set_prop(self): - prop_name = 'foo.bar' - self.device.shell(['setprop', prop_name, '""']) - - self.device.set_prop(prop_name, 'qux') - self.assertEqual( - self.device.shell(['getprop', prop_name])[0].strip(), 'qux') - - -def compute_md5(string): - hsh = hashlib.md5() - hsh.update(string) - return hsh.hexdigest() - - -def get_md5_prog(device): - """Older platforms (pre-L) had the name md5 rather than md5sum.""" - try: - device.shell(['md5sum', '/proc/uptime']) - return 'md5sum' - except subprocess.CalledProcessError: - return 'md5' - - -class HostFile(object): - def __init__(self, handle, checksum): - self.handle = handle - self.checksum = checksum - self.full_path = handle.name - self.base_name = os.path.basename(self.full_path) - - -class DeviceFile(object): - def __init__(self, checksum, full_path): - self.checksum = checksum - self.full_path = full_path - self.base_name = posixpath.basename(self.full_path) - - -def make_random_host_files(in_dir, num_files): - min_size = 1 * (1 << 10) - max_size = 16 * (1 << 10) - - files = [] - for _ in xrange(num_files): - file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) - - size = random.randrange(min_size, max_size, 1024) - rand_str = os.urandom(size) - file_handle.write(rand_str) - file_handle.flush() - file_handle.close() - - md5 = compute_md5(rand_str) - files.append(HostFile(file_handle, md5)) - return files - - -def make_random_device_files(device, in_dir, num_files): - min_size = 1 * (1 << 10) - max_size = 16 * (1 << 10) - - files = [] - for file_num in xrange(num_files): - size = random.randrange(min_size, max_size, 1024) - - base_name = 'device_tmpfile' + str(file_num) - full_path = posixpath.join(in_dir, base_name) - - device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), - 'bs={}'.format(size), 'count=1']) - dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split() - - files.append(DeviceFile(dev_md5, full_path)) - return files - - -class FileOperationsTest(DeviceTest): - SCRATCH_DIR = '/data/local/tmp' - DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' - DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' - - def _test_push(self, local_file, checksum): - self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) - self.device.push(local=local_file, remote=self.DEVICE_TEMP_FILE) - dev_md5, _ = self.device.shell([get_md5_prog(self.device), - self.DEVICE_TEMP_FILE])[0].split() - self.assertEqual(checksum, dev_md5) - self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) - - def test_push(self): - """Push a randomly generated file to specified device.""" - kbytes = 512 - tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) - rand_str = os.urandom(1024 * kbytes) - tmp.write(rand_str) - tmp.close() - self._test_push(tmp.name, compute_md5(rand_str)) - os.remove(tmp.name) - - # TODO: write push directory test. - - def _test_pull(self, remote_file, checksum): - tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) - tmp_write.close() - self.device.pull(remote=remote_file, local=tmp_write.name) - with open(tmp_write.name, 'rb') as tmp_read: - host_contents = tmp_read.read() - host_md5 = compute_md5(host_contents) - self.assertEqual(checksum, host_md5) - os.remove(tmp_write.name) - - def test_pull(self): - """Pull a randomly generated file from specified device.""" - kbytes = 512 - self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) - cmd = ['dd', 'if=/dev/urandom', - 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', - 'count={}'.format(kbytes)] - self.device.shell(cmd) - dev_md5, _ = self.device.shell( - [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split() - self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) - self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) - - def test_pull_dir(self): - """Pull a randomly generated directory of files from the device.""" - host_dir = tempfile.mkdtemp() - self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) - self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) - - # Populate device directory with random files. - temp_files = make_random_device_files( - self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) - - self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) - - for temp_file in temp_files: - host_path = os.path.join(host_dir, temp_file.base_name) - with open(host_path, 'rb') as host_file: - host_md5 = compute_md5(host_file.read()) - self.assertEqual(host_md5, temp_file.checksum) - - self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) - if host_dir is not None: - shutil.rmtree(host_dir) - - def test_sync(self): - """Sync a randomly generated directory of files to specified device.""" - base_dir = tempfile.mkdtemp() - - # Create mirror device directory hierarchy within base_dir. - full_dir_path = base_dir + self.DEVICE_TEMP_DIR - os.makedirs(full_dir_path) - - # Create 32 random files within the host mirror. - temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32) - - # Clean up any trash on the device. - device = adb.get_device(product=base_dir) - device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) - - device.sync('data') - - # Confirm that every file on the device mirrors that on the host. - for temp_file in temp_files: - device_full_path = posixpath.join(self.DEVICE_TEMP_DIR, - temp_file.base_name) - dev_md5, _ = device.shell( - [get_md5_prog(self.device), device_full_path])[0].split() - self.assertEqual(temp_file.checksum, dev_md5) - - self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) - if base_dir is not None: - shutil.rmtree(base_dir) - - def test_unicode_paths(self): - """Ensure that we can support non-ASCII paths, even on Windows.""" - name = u'로보카 폴리' - - ## push. - tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) - tf.close() - self.device.push(tf.name, u'/data/local/tmp/adb-test-{}'.format(name)) - os.remove(tf.name) - self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) - - # pull. - cmd = ['touch', u'"/data/local/tmp/adb-test-{}"'.format(name)] - self.device.shell(cmd) - - tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False) - tf.close() - self.device.pull(u'/data/local/tmp/adb-test-{}'.format(name), tf.name) - os.remove(tf.name) - self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) - - -def main(): - random.seed(0) - if len(adb.get_devices()) > 0: - suite = unittest.TestLoader().loadTestsFromName(__name__) - unittest.TextTestRunner(verbosity=3).run(suite) - else: - print('Test suite must be run with attached devices') - - -if __name__ == '__main__': - main()