diff --git a/adb/__init__.py b/adb/__init__.py new file mode 100644 index 000000000..6b509c643 --- /dev/null +++ b/adb/__init__.py @@ -0,0 +1,17 @@ +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from .device import * # pylint: disable=wildcard-import diff --git a/adb/device.py b/adb/device.py new file mode 100644 index 000000000..601989b99 --- /dev/null +++ b/adb/device.py @@ -0,0 +1,233 @@ +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import os +import re +import subprocess + + +class FindDeviceError(RuntimeError): + pass + + +class DeviceNotFoundError(FindDeviceError): + def __init__(self, serial): + self.serial = serial + super(DeviceNotFoundError, self).__init__( + 'No device with serial {}'.format(serial)) + + +class NoUniqueDeviceError(FindDeviceError): + def __init__(self): + super(NoUniqueDeviceError, self).__init__('No unique device') + + +def get_devices(): + with open(os.devnull, 'wb') as devnull: + subprocess.check_call(['adb', 'start-server'], stdout=devnull, + stderr=devnull) + out = subprocess.check_output(['adb', 'devices']).splitlines() + + # The first line of `adb devices` just says "List of attached devices", so + # skip that. + devices = [] + for line in out[1:]: + if not line.strip(): + continue + if 'offline' in line: + continue + + serial, _ = re.split(r'\s+', line, maxsplit=1) + devices.append(serial) + return devices + + +def _get_unique_device(product=None): + devices = get_devices() + if len(devices) != 1: + raise NoUniqueDeviceError() + return AndroidDevice(devices[0], product) + +def _get_device_by_serial(serial, product=None): + for device in get_devices(): + if device == serial: + return AndroidDevice(serial, product) + raise DeviceNotFoundError(serial) + + +def get_device(serial=None, product=None): + """Get a uniquely identified AndroidDevice if one is available. + + Raises: + DeviceNotFoundError: + The serial specified by `serial` or $ANDROID_SERIAL is not + connected. + + NoUniqueDeviceError: + Neither `serial` nor $ANDROID_SERIAL was set, and the number of + devices connected to the system is not 1. Having 0 connected + devices will also result in this error. + + Returns: + An AndroidDevice associated with the first non-None identifier in the + following order of preference: + + 1) The `serial` argument. + 2) The environment variable $ANDROID_SERIAL. + 3) The single device connnected to the system. + """ + if serial is not None: + return _get_device_by_serial(serial, product) + + android_serial = os.getenv('ANDROID_SERIAL') + if android_serial is not None: + return _get_device_by_serial(android_serial, product) + + return _get_unique_device(product) + + +class AndroidDevice(object): + def __init__(self, serial, product=None): + self.serial = serial + self.product = product + self.adb_cmd = ['adb'] + if self.serial is not None: + self.adb_cmd.extend(['-s', serial]) + if self.product is not None: + self.adb_cmd.extend(['-p', product]) + self._linesep = None + self._shell_result_pattern = None + + @property + def linesep(self): + if self._linesep is None: + self._linesep = subprocess.check_output(['adb', 'shell', 'echo']) + return self._linesep + + def _make_shell_cmd(self, user_cmd): + # Follow any shell command with `; echo; echo $?` to get the exit + # status of a program since this isn't propagated by adb. + # + # The leading newline is needed because `printf 1; echo $?` would print + # "10", and we wouldn't be able to distinguish the exit code. + rc_probe = '; echo "\n$?"' + return self.adb_cmd + ['shell'] + user_cmd + [rc_probe] + + def _parse_shell_output(self, out): # pylint: disable=no-self-use + search_text = out + max_result_len = len('{0}255{0}'.format(self.linesep)) + if len(search_text) > max_result_len: + # We don't want to regex match over massive amounts of data when we + # know the part we want is right at the end. + search_text = search_text[-max_result_len:] + if self._shell_result_pattern is None: + self._shell_result_pattern = re.compile( + r'({0}\d+{0})$'.format(self.linesep), re.MULTILINE) + m = self._shell_result_pattern.search(search_text) + if m is None: + raise RuntimeError('Could not find exit status in shell output.') + + result_text = m.group(1) + result = int(result_text.strip()) + out = out[:-len(result_text)] # Trim the result text from the output. + return result, out + + def _simple_call(self, cmd): + return subprocess.check_output( + self.adb_cmd + cmd, stderr=subprocess.STDOUT) + + def shell(self, cmd): + cmd = self._make_shell_cmd(cmd) + out = subprocess.check_output(cmd) + rc, out = self._parse_shell_output(out) + if rc != 0: + error = subprocess.CalledProcessError(rc, cmd) + error.out = out + raise error + return out + + def shell_nocheck(self, cmd): + cmd = self._make_shell_cmd(cmd) + p = subprocess.Popen( + cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + out, _ = p.communicate() + return self._parse_shell_output(out) + + def install(self, filename): + return self._simple_call(['install', filename]) + + def push(self, local, remote): + return self._simple_call(['push', local, remote]) + + def pull(self, remote, local): + return self._simple_call(['pull', remote, local]) + + def sync(self, directory=None): + cmd = ['sync'] + if directory is not None: + cmd.append(directory) + return self._simple_call(cmd) + + def forward(self, local, remote): + return self._simple_call(['forward', local, remote]) + + def tcpip(self, port): + return self._simple_call(['tcpip', port]) + + def usb(self): + return self._simple_call(['usb']) + + def root(self): + return self._simple_call(['root']) + + def unroot(self): + return self._simple_call(['unroot']) + + def forward_remove(self, local): + return self._simple_call(['forward', '--remove', local]) + + def forward_remove_all(self): + return self._simple_call(['forward', '--remove-all']) + + def connect(self, host): + return self._simple_call(['connect', host]) + + def disconnect(self, host): + return self._simple_call(['disconnect', host]) + + def reverse(self, remote, local): + return self._simple_call(['reverse', remote, local]) + + def reverse_remove_all(self): + return self._simple_call(['reverse', '--remove-all']) + + def reverse_remove(self, remote): + return self._simple_call(['reverse', '--remove', remote]) + + def wait(self): + return self._simple_call(['wait-for-device']) + + def get_prop(self, prop_name): + output = self.shell(['getprop', prop_name]) + if len(output) != 1: + raise RuntimeError('Too many lines in getprop output:\n' + + '\n'.join(output)) + value = output[0] + if not value.strip(): + return None + return value + + def set_prop(self, prop_name, value): + self.shell(['setprop', prop_name, value]) diff --git a/adb/test_adb.py b/adb/test_adb.py new file mode 100644 index 000000000..59aa14deb --- /dev/null +++ b/adb/test_adb.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +"""Tests for the adb program itself. + +This differs from things in test_device.py in that there is no API for these +things. Most of these tests involve specific error messages or the help text. +""" +from __future__ import print_function + +import random +import subprocess +import unittest + +import adb + + +class NonApiTest(unittest.TestCase): + """Tests for ADB that aren't a part of the AndroidDevice API.""" + + def test_help(self): + """Make sure we get _something_ out of help.""" + out = subprocess.check_output( + ['adb', 'help'], stderr=subprocess.STDOUT) + self.assertGreater(len(out), 0) + + def test_version(self): + """Get a version number out of the output of adb.""" + lines = subprocess.check_output(['adb', 'version']).splitlines() + version_line = lines[0] + self.assertRegexpMatches( + version_line, r'^Android Debug Bridge version \d+\.\d+\.\d+$') + if len(lines) == 2: + # Newer versions of ADB have a second line of output for the + # version that includes a specific revision (git SHA). + revision_line = lines[1] + self.assertRegexpMatches( + revision_line, r'^Revision [0-9a-f]{12}-android$') + + def test_tcpip_error_messages(self): + p = subprocess.Popen(['adb', 'tcpip'], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, _ = p.communicate() + self.assertEqual(1, p.returncode) + self.assertIn('help message', out) + + p = subprocess.Popen(['adb', 'tcpip', 'foo'], stdout=subprocess.PIPE, + stderr=subprocess.STDOUT) + out, _ = p.communicate() + self.assertEqual(1, p.returncode) + self.assertIn('error', out) + + +def main(): + random.seed(0) + if len(adb.get_devices()) > 0: + suite = unittest.TestLoader().loadTestsFromName(__name__) + unittest.TextTestRunner(verbosity=3).run(suite) + else: + print('Test suite must be run with attached devices') + + +if __name__ == '__main__': + main() diff --git a/adb/test_device.py b/adb/test_device.py new file mode 100644 index 000000000..6c20b6e0b --- /dev/null +++ b/adb/test_device.py @@ -0,0 +1,424 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2015 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import print_function + +import hashlib +import os +import posixpath +import random +import shlex +import shutil +import subprocess +import tempfile +import unittest + +import mock + +import adb + + +class GetDeviceTest(unittest.TestCase): + def setUp(self): + self.android_serial = os.getenv('ANDROID_SERIAL') + del os.environ['ANDROID_SERIAL'] + + def tearDown(self): + os.environ['ANDROID_SERIAL'] = self.android_serial + + @mock.patch('adb.device.get_devices') + def test_explicit(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_from_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'foo' + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_arg_beats_env(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + os.environ['ANDROID_SERIAL'] = 'bar' + device = adb.get_device('foo') + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_such_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz']) + + os.environ['ANDROID_SERIAL'] = 'baz' + self.assertRaises(adb.DeviceNotFoundError, adb.get_device) + + @mock.patch('adb.device.get_devices') + def test_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo'] + device = adb.get_device() + self.assertEqual(device.serial, 'foo') + + @mock.patch('adb.device.get_devices') + def test_no_unique_device(self, mock_get_devices): + mock_get_devices.return_value = ['foo', 'bar'] + self.assertRaises(adb.NoUniqueDeviceError, adb.get_device) + + +class DeviceTest(unittest.TestCase): + def setUp(self): + self.device = adb.get_device() + + +class ShellTest(DeviceTest): + def test_cat(self): + """Check that we can at least cat a file.""" + out = self.device.shell(['cat', '/proc/uptime']).strip() + elements = out.split() + self.assertEqual(len(elements), 2) + + uptime, idle = elements + self.assertGreater(float(uptime), 0.0) + self.assertGreater(float(idle), 0.0) + + def test_throws_on_failure(self): + self.assertRaises(subprocess.CalledProcessError, + self.device.shell, ['false']) + + def test_output_not_stripped(self): + out = self.device.shell(['echo', 'foo']) + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_shell_nocheck_failure(self): + rc, out = self.device.shell_nocheck(['false']) + self.assertNotEqual(rc, 0) + self.assertEqual(out, '') + + def test_shell_nocheck_output_not_stripped(self): + rc, out = self.device.shell_nocheck(['echo', 'foo']) + self.assertEqual(rc, 0) + self.assertEqual(out, 'foo' + self.device.linesep) + + def test_can_distinguish_tricky_results(self): + # If result checking on ADB shell is naively implemented as + # `adb shell ; echo $?`, we would be unable to distinguish the + # output from the result for a cmd of `echo -n 1`. + rc, out = self.device.shell_nocheck(['echo', '-n', '1']) + self.assertEqual(rc, 0) + self.assertEqual(out, '1') + + def test_line_endings(self): + """Ensure that line ending translation is not happening in the pty. + + Bug: http://b/19735063 + """ + output = self.device.shell(['uname']) + self.assertEqual(output, 'Linux' + self.device.linesep) + + +class ArgumentEscapingTest(DeviceTest): + def test_shell_escaping(self): + """Make sure that argument escaping is somewhat sane.""" + + # http://b/19734868 + # Note that this actually matches ssh(1)'s behavior --- it's + # converted to `sh -c echo hello; echo world` which sh interprets + # as `sh -c echo` (with an argument to that shell of "hello"), + # and then `echo world` back in the first shell. + result = self.device.shell( + shlex.split("sh -c 'echo hello; echo world'")) + result = result.splitlines() + self.assertEqual(['', 'world'], result) + # If you really wanted "hello" and "world", here's what you'd do: + result = self.device.shell( + shlex.split(r'echo hello\;echo world')).splitlines() + self.assertEqual(['hello', 'world'], result) + + # http://b/15479704 + result = self.device.shell(shlex.split("'true && echo t'")).strip() + self.assertEqual('t', result) + result = self.device.shell( + shlex.split("sh -c 'true && echo t'")).strip() + self.assertEqual('t', result) + + # http://b/20564385 + result = self.device.shell(shlex.split('FOO=a BAR=b echo t')).strip() + self.assertEqual('t', result) + result = self.device.shell(shlex.split(r'echo -n 123\;uname')).strip() + self.assertEqual('123Linux', result) + + def test_install_argument_escaping(self): + """Make sure that install argument escaping works.""" + # http://b/20323053 + tf = tempfile.NamedTemporaryFile('wb', suffix='-text;ls;1.apk') + self.assertIn("-text;ls;1.apk", self.device.install(tf.name)) + + # http://b/3090932 + tf = tempfile.NamedTemporaryFile('wb', suffix="-Live Hold'em.apk") + self.assertIn("-Live Hold'em.apk", self.device.install(tf.name)) + + +class RootUnrootTest(DeviceTest): + def _test_root(self): + message = self.device.root() + if 'adbd cannot run as root in production builds' in message: + return + self.device.wait() + self.assertEqual('root', self.device.shell(['id', '-un']).strip()) + + def _test_unroot(self): + self.device.unroot() + self.device.wait() + self.assertEqual('shell', self.device.shell(['id', '-un']).strip()) + + def test_root_unroot(self): + """Make sure that adb root and adb unroot work, using id(1).""" + original_user = self.device.shell(['id', '-un']).strip() + try: + if original_user == 'root': + self._test_unroot() + self._test_root() + elif original_user == 'shell': + self._test_root() + self._test_unroot() + finally: + if original_user == 'root': + self.device.root() + else: + self.device.unroot() + self.device.wait() + + +class TcpIpTest(DeviceTest): + def test_tcpip_failure_raises(self): + """adb tcpip requires a port. + + Bug: http://b/22636927 + """ + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, '') + self.assertRaises( + subprocess.CalledProcessError, self.device.tcpip, 'foo') + + +def compute_md5(string): + hsh = hashlib.md5() + hsh.update(string) + return hsh.hexdigest() + + +def get_md5_prog(device): + """Older platforms (pre-L) had the name md5 rather than md5sum.""" + try: + device.shell(['md5sum', '/proc/uptime']) + return 'md5sum' + except subprocess.CalledProcessError: + return 'md5' + + +class HostFile(object): + def __init__(self, handle, checksum): + self.handle = handle + self.checksum = checksum + self.full_path = handle.name + self.base_name = os.path.basename(self.full_path) + + +class DeviceFile(object): + def __init__(self, checksum, full_path): + self.checksum = checksum + self.full_path = full_path + self.base_name = posixpath.basename(self.full_path) + + +def make_random_host_files(in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for _ in xrange(num_files): + file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) + + size = random.randrange(min_size, max_size, 1024) + rand_str = os.urandom(size) + file_handle.write(rand_str) + file_handle.flush() + file_handle.close() + + md5 = compute_md5(rand_str) + files.append(HostFile(file_handle, md5)) + return files + + +def make_random_device_files(device, in_dir, num_files): + min_size = 1 * (1 << 10) + max_size = 16 * (1 << 10) + + files = [] + for file_num in xrange(num_files): + size = random.randrange(min_size, max_size, 1024) + + base_name = 'device_tmpfile' + str(file_num) + full_path = os.path.join(in_dir, base_name) + + device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path), + 'bs={}'.format(size), 'count=1']) + dev_md5, _ = device.shell([get_md5_prog(device), full_path]).split() + + files.append(DeviceFile(dev_md5, full_path)) + return files + + +class FileOperationsTest(DeviceTest): + SCRATCH_DIR = '/data/local/tmp' + DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file' + DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir' + + def _test_push(self, local_file, checksum): + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + try: + self.device.push( + local=local_file, remote=self.DEVICE_TEMP_FILE) + dev_md5, _ = self.device.shell( + [get_md5_prog(self.device), self.DEVICE_TEMP_FILE]).split() + self.assertEqual(checksum, dev_md5) + finally: + self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE]) + + def test_push(self): + """Push a randomly generated file to specified device.""" + kbytes = 512 + tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False) + try: + rand_str = os.urandom(1024 * kbytes) + tmp.write(rand_str) + tmp.close() + self._test_push(tmp.name, compute_md5(rand_str)) + finally: + os.remove(tmp.name) + + # TODO: write push directory test. + + def _test_pull(self, remote_file, checksum): + tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False) + try: + tmp_write.close() + self.device.pull(remote=remote_file, local=tmp_write.name) + with open(tmp_write.name, 'rb') as tmp_read: + host_contents = tmp_read.read() + host_md5 = compute_md5(host_contents) + self.assertEqual(checksum, host_md5) + finally: + os.remove(tmp_write.name) + + def test_pull(self): + """Pull a randomly generated file from specified device.""" + kbytes = 512 + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE]) + try: + cmd = ['dd', 'if=/dev/urandom', + 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024', + 'count={}'.format(kbytes)] + self.device.shell(cmd) + dev_md5, _ = self.device.shell( + [get_md5_prog(self.device), self.DEVICE_TEMP_FILE]).split() + self._test_pull(self.DEVICE_TEMP_FILE, dev_md5) + finally: + self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE]) + + def test_pull_dir(self): + """Pull a randomly generated directory of files from the device.""" + host_dir = tempfile.mkdtemp() + try: + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR]) + + # Populate device directory with random files. + temp_files = make_random_device_files( + self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32) + + self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir) + + for temp_file in temp_files: + host_path = os.path.join(host_dir, temp_file.base_name) + with open(host_path, 'rb') as host_file: + host_md5 = compute_md5(host_file.read()) + self.assertEqual(host_md5, temp_file.checksum) + finally: + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + if host_dir is not None: + shutil.rmtree(host_dir) + + def test_sync(self): + """Sync a randomly generated directory of files to specified device.""" + base_dir = tempfile.mkdtemp() + try: + # Create mirror device directory hierarchy within base_dir. + full_dir_path = base_dir + self.DEVICE_TEMP_DIR + os.makedirs(full_dir_path) + + # Create 32 random files within the host mirror. + temp_files = make_random_host_files(in_dir=full_dir_path, + num_files=32) + + # Clean up any trash on the device. + device = adb.get_device(product=base_dir) + device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + + device.sync('data') + + # Confirm that every file on the device mirrors that on the host. + for temp_file in temp_files: + device_full_path = posixpath.join( + self.DEVICE_TEMP_DIR, temp_file.base_name) + dev_md5, _ = device.shell( + [get_md5_prog(self.device), device_full_path]).split() + self.assertEqual(temp_file.checksum, dev_md5) + finally: + self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR]) + shutil.rmtree(base_dir + self.DEVICE_TEMP_DIR) + + + def test_unicode_paths(self): + """Ensure that we can support non-ASCII paths, even on Windows.""" + name = u'로보카 폴리'.encode('utf-8') + + ## push. + tf = tempfile.NamedTemporaryFile('wb', suffix=name) + self.device.push(tf.name, '/data/local/tmp/adb-test-{}'.format(name)) + self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*']) + + # pull. + cmd = ['touch', '"/data/local/tmp/adb-test-{}"'.format(name)] + self.device.shell(cmd) + + tf = tempfile.NamedTemporaryFile('wb', suffix=name) + self.device.pull('/data/local/tmp/adb-test-{}'.format(name), tf.name) + + +def main(): + random.seed(0) + if len(adb.get_devices()) > 0: + suite = unittest.TestLoader().loadTestsFromName(__name__) + unittest.TextTestRunner(verbosity=3).run(suite) + else: + print('Test suite must be run with attached devices') + + +if __name__ == '__main__': + main() diff --git a/adb/tests/test_adb.py b/adb/tests/test_adb.py deleted file mode 100755 index 773963336..000000000 --- a/adb/tests/test_adb.py +++ /dev/null @@ -1,496 +0,0 @@ -#!/usr/bin/env python2 -# -*- coding: utf-8 -*- - -"""Simple conformance test for adb. - -This script will use the available adb in path and run simple -tests that attempt to touch all accessible attached devices. -""" -import hashlib -import os -import pipes -import posixpath -import random -import re -import shlex -import subprocess -import sys -import tempfile -import unittest - - -def trace(cmd): - """Print debug message if tracing enabled.""" - if False: - print >> sys.stderr, cmd - - -def call(cmd_str): - """Run process and return output tuple (stdout, stderr, ret code).""" - trace(cmd_str) - process = subprocess.Popen(shlex.split(cmd_str), - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - stdout, stderr = process.communicate() - return stdout, stderr, process.returncode - - -def call_combined(cmd_str): - """Run process and return output tuple (stdout+stderr, ret code).""" - trace(cmd_str) - process = subprocess.Popen(shlex.split(cmd_str), - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - stdout, _ = process.communicate() - return stdout, process.returncode - - -def call_checked(cmd_str): - """Run process and get stdout+stderr, raise an exception on trouble.""" - trace(cmd_str) - return subprocess.check_output(shlex.split(cmd_str), - stderr=subprocess.STDOUT) - - -def call_checked_list(cmd_str): - return call_checked(cmd_str).split('\n') - - -def call_checked_list_skip(cmd_str): - out_list = call_checked_list(cmd_str) - - def is_init_line(line): - if (len(line) >= 3) and (line[0] == "*") and (line[-2] == "*"): - return True - else: - return False - - return [line for line in out_list if not is_init_line(line)] - - -def get_device_list(): - output = call_checked_list_skip("adb devices") - dev_list = [] - for line in output[1:]: - if line.strip() == "": - continue - device, _ = line.split() - dev_list.append(device) - return dev_list - - -def get_attached_device_count(): - return len(get_device_list()) - - -def compute_md5(string): - hsh = hashlib.md5() - hsh.update(string) - return hsh.hexdigest() - - -class HostFile(object): - def __init__(self, handle, md5): - self.handle = handle - self.md5 = md5 - self.full_path = handle.name - self.base_name = os.path.basename(self.full_path) - - -class DeviceFile(object): - def __init__(self, md5, full_path): - self.md5 = md5 - self.full_path = full_path - self.base_name = posixpath.basename(self.full_path) - - -def make_random_host_files(in_dir, num_files, rand_size=True): - files = {} - min_size = 1 * (1 << 10) - max_size = 16 * (1 << 10) - fixed_size = min_size - - for _ in range(num_files): - file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False) - - if rand_size: - size = random.randrange(min_size, max_size, 1024) - else: - size = fixed_size - rand_str = os.urandom(size) - file_handle.write(rand_str) - file_handle.flush() - file_handle.close() - - md5 = compute_md5(rand_str) - files[file_handle.name] = HostFile(file_handle, md5) - return files - - -def make_random_device_files(adb, in_dir, num_files, rand_size=True): - files = {} - min_size = 1 * (1 << 10) - max_size = 16 * (1 << 10) - fixed_size = min_size - - for i in range(num_files): - if rand_size: - size = random.randrange(min_size, max_size, 1024) - else: - size = fixed_size - - base_name = "device_tmpfile" + str(i) - full_path = in_dir + "/" + base_name - - adb.shell("dd if=/dev/urandom of={} bs={} count=1".format(full_path, - size)) - dev_md5, _ = adb.shell("md5sum {}".format(full_path)).split() - - files[full_path] = DeviceFile(dev_md5, full_path) - return files - - -class AdbWrapper(object): - """Convenience wrapper object for the adb command.""" - def __init__(self, device=None, out_dir=None): - self.device = device - self.out_dir = out_dir - self.adb_cmd = "adb " - if self.device: - self.adb_cmd += "-s {} ".format(pipes.quote(device)) - if self.out_dir: - self.adb_cmd += "-p {} ".format(pipes.quote(out_dir)) - - def shell(self, cmd): - return call_checked(self.adb_cmd + "shell " + cmd) - - def shell_nocheck(self, cmd): - return call_combined(self.adb_cmd + "shell " + cmd) - - def install(self, filename): - return call_checked( - self.adb_cmd + "install {}".format(pipes.quote(filename))) - - def push(self, local, remote): - return call_checked(self.adb_cmd + "push {} {}".format( - pipes.quote(local), pipes.quote(remote))) - - def pull(self, remote, local): - return call_checked(self.adb_cmd + "pull {} {}".format( - pipes.quote(remote), pipes.quote(local))) - - def sync(self, directory=""): - return call_checked(self.adb_cmd + "sync {}".format( - pipes.quote(directory) if directory else directory)) - - def forward(self, local, remote): - return call_checked(self.adb_cmd + "forward {} {}".format(local, - remote)) - - def tcpip(self, port): - return call_combined(self.adb_cmd + "tcpip {}".format(port)) - - def usb(self): - return call_checked(self.adb_cmd + "usb") - - def root(self): - return call_checked(self.adb_cmd + "root") - - def unroot(self): - return call_checked(self.adb_cmd + "unroot") - - def forward_remove(self, local): - return call_checked(self.adb_cmd + "forward --remove {}".format(local)) - - def forward_remove_all(self): - return call_checked(self.adb_cmd + "forward --remove-all") - - def connect(self, host): - return call_checked(self.adb_cmd + "connect {}".format(host)) - - def disconnect(self, host): - return call_checked(self.adb_cmd + "disconnect {}".format(host)) - - def reverse(self, remote, local): - return call_checked(self.adb_cmd + "reverse {} {}".format(remote, - local)) - - def reverse_remove_all(self): - return call_checked(self.adb_cmd + "reverse --remove-all") - - def reverse_remove(self, remote): - return call_checked( - self.adb_cmd + "reverse --remove {}".format(remote)) - - def wait(self): - return call_checked(self.adb_cmd + "wait-for-device") - - -class AdbBasic(unittest.TestCase): - def test_shell(self): - """Check that we can at least cat a file.""" - adb = AdbWrapper() - out = adb.shell("cat /proc/uptime") - self.assertEqual(len(out.split()), 2) - self.assertGreater(float(out.split()[0]), 0.0) - self.assertGreater(float(out.split()[1]), 0.0) - - def test_help(self): - """Make sure we get _something_ out of help.""" - out = call_checked("adb help") - self.assertTrue(len(out) > 0) - - def test_version(self): - """Get a version number out of the output of adb.""" - out = call_checked("adb version").split() - version_num = False - for item in out: - if re.match(r"[\d+\.]*\d", item): - version_num = True - self.assertTrue(version_num) - - def _test_root(self): - adb = AdbWrapper() - if "adbd cannot run as root in production builds" in adb.root(): - return - adb.wait() - self.assertEqual("root", adb.shell("id -un").strip()) - - def _test_unroot(self): - adb = AdbWrapper() - adb.unroot() - adb.wait() - self.assertEqual("shell", adb.shell("id -un").strip()) - - def test_root_unroot(self): - """Make sure that adb root and adb unroot work, using id(1).""" - adb = AdbWrapper() - original_user = adb.shell("id -un").strip() - try: - if original_user == "root": - self._test_unroot() - self._test_root() - elif original_user == "shell": - self._test_root() - self._test_unroot() - finally: - if original_user == "root": - adb.root() - else: - adb.unroot() - adb.wait() - - def test_argument_escaping(self): - """Make sure that argument escaping is somewhat sane.""" - adb = AdbWrapper() - - # http://b/19734868 - # Note that this actually matches ssh(1)'s behavior --- it's - # converted to "sh -c echo hello; echo world" which sh interprets - # as "sh -c echo" (with an argument to that shell of "hello"), - # and then "echo world" back in the first shell. - result = adb.shell("sh -c 'echo hello; echo world'").splitlines() - self.assertEqual(["", "world"], result) - # If you really wanted "hello" and "world", here's what you'd do: - result = adb.shell(r"echo hello\;echo world").splitlines() - self.assertEqual(["hello", "world"], result) - - # http://b/15479704 - self.assertEqual('t', adb.shell("'true && echo t'").strip()) - self.assertEqual('t', adb.shell("sh -c 'true && echo t'").strip()) - - # http://b/20564385 - self.assertEqual('t', adb.shell("FOO=a BAR=b echo t").strip()) - self.assertEqual('123Linux', adb.shell(r"echo -n 123\;uname").strip()) - - def test_install_argument_escaping(self): - """Make sure that install argument escaping works.""" - adb = AdbWrapper() - - # http://b/20323053 - tf = tempfile.NamedTemporaryFile("wb", suffix="-text;ls;1.apk") - self.assertIn("-text;ls;1.apk", adb.install(tf.name)) - - # http://b/3090932 - tf = tempfile.NamedTemporaryFile("wb", suffix="-Live Hold'em.apk") - self.assertIn("-Live Hold'em.apk", adb.install(tf.name)) - - def test_line_endings(self): - """Ensure that line ending translation is not happening in the pty. - - Bug: http://b/19735063 - """ - output = AdbWrapper().shell("uname") - if sys.platform == 'win32': - # adb.exe running on Windows does translation to the Windows \r\n - # convention, so we should expect those chars. - self.assertEqual(output, "Linux\r\n") - else: - self.assertEqual(output, "Linux\n") - - def test_tcpip(self): - """adb tcpip requires a port. http://b/22636927""" - output, status_code = AdbWrapper().tcpip("") - self.assertEqual(1, status_code) - self.assertIn("help message", output) - - output, status_code = AdbWrapper().tcpip("blah") - self.assertEqual(1, status_code) - self.assertIn("error", output) - - def test_unicode_paths(self): - """Ensure that we can support non-ASCII paths, even on Windows.""" - adb = AdbWrapper() - name = u'로보카 폴리'.encode('utf-8') - - # push. - tf = tempfile.NamedTemporaryFile("wb", suffix=name) - adb.push(tf.name, "/data/local/tmp/adb-test-{}".format(name)) - - # pull. - adb.shell("rm \"'/data/local/tmp/adb-test-*'\"".format(name)) - adb.shell("touch \"'/data/local/tmp/adb-test-{}'\"".format(name)) - tf = tempfile.NamedTemporaryFile("wb", suffix=name) - adb.pull("/data/local/tmp/adb-test-{}".format(name), tf.name) - - -class AdbFile(unittest.TestCase): - SCRATCH_DIR = "/data/local/tmp" - DEVICE_TEMP_FILE = SCRATCH_DIR + "/adb_test_file" - DEVICE_TEMP_DIR = SCRATCH_DIR + "/adb_test_dir" - - def test_push(self): - """Push a randomly generated file to specified device.""" - kbytes = 512 - adb = AdbWrapper() - with tempfile.NamedTemporaryFile(mode="wb", delete=False) as tmp: - try: - rand_str = os.urandom(1024 * kbytes) - tmp.write(rand_str) - tmp.flush() - tmp.close() - - host_md5 = compute_md5(rand_str) - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) - try: - adb.push(local=tmp.name, remote=AdbFile.DEVICE_TEMP_FILE) - dev_md5, _ = adb.shell( - "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() - self.assertEqual(host_md5, dev_md5) - finally: - adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) - finally: - os.remove(tmp.name) - - # TODO: write push directory test. - - def test_pull(self): - """Pull a randomly generated file from specified device.""" - kbytes = 512 - adb = AdbWrapper() - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_FILE)) - try: - adb.shell("dd if=/dev/urandom of={} bs=1024 count={}".format( - AdbFile.DEVICE_TEMP_FILE, kbytes)) - dev_md5, _ = adb.shell( - "md5sum {}".format(AdbFile.DEVICE_TEMP_FILE)).split() - - with tempfile.NamedTemporaryFile(mode="wb", delete=False) \ - as tmp_write: - try: - tmp_write.close() - adb.pull(remote=AdbFile.DEVICE_TEMP_FILE, - local=tmp_write.name) - with open(tmp_write.name, "rb") as tmp_read: - host_contents = tmp_read.read() - host_md5 = compute_md5(host_contents) - self.assertEqual(dev_md5, host_md5) - finally: - os.remove(tmp_write.name) - finally: - adb.shell_nocheck("rm {}".format(AdbFile.DEVICE_TEMP_FILE)) - - def test_pull_dir(self): - """Pull a randomly generated directory of files from the device.""" - adb = AdbWrapper() - temp_files = {} - host_dir = None - try: - # create temporary host directory - host_dir = tempfile.mkdtemp() - - # create temporary dir on device - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) - adb.shell("mkdir -p {}".format(AdbFile.DEVICE_TEMP_DIR)) - - # populate device dir with random files - temp_files = make_random_device_files( - adb, in_dir=AdbFile.DEVICE_TEMP_DIR, num_files=32) - - adb.pull(remote=AdbFile.DEVICE_TEMP_DIR, local=host_dir) - - for device_full_path in temp_files: - host_path = os.path.join( - host_dir, temp_files[device_full_path].base_name) - with open(host_path, "rb") as host_file: - host_md5 = compute_md5(host_file.read()) - self.assertEqual(host_md5, - temp_files[device_full_path].md5) - finally: - for dev_file in temp_files.values(): - host_path = os.path.join(host_dir, dev_file.base_name) - os.remove(host_path) - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) - if host_dir: - os.removedirs(host_dir) - - def test_sync(self): - """Sync a randomly generated directory of files to specified device.""" - try: - adb = AdbWrapper() - temp_files = {} - - # create temporary host directory - base_dir = tempfile.mkdtemp() - - # create mirror device directory hierarchy within base_dir - full_dir_path = base_dir + AdbFile.DEVICE_TEMP_DIR - os.makedirs(full_dir_path) - - # create 32 random files within the host mirror - temp_files = make_random_host_files(in_dir=full_dir_path, - num_files=32) - - # clean up any trash on the device - adb = AdbWrapper(out_dir=base_dir) - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) - - # issue the sync - adb.sync("data") - - # confirm that every file on the device mirrors that on the host - for host_full_path in temp_files.keys(): - device_full_path = posixpath.join( - AdbFile.DEVICE_TEMP_DIR, - temp_files[host_full_path].base_name) - dev_md5, _ = adb.shell( - "md5sum {}".format(device_full_path)).split() - self.assertEqual(temp_files[host_full_path].md5, dev_md5) - - finally: - adb.shell_nocheck("rm -r {}".format(AdbFile.DEVICE_TEMP_DIR)) - if temp_files: - for tf in temp_files.values(): - os.remove(tf.full_path) - if base_dir: - os.removedirs(base_dir + AdbFile.DEVICE_TEMP_DIR) - - -if __name__ == '__main__': - random.seed(0) - dev_count = get_attached_device_count() - if dev_count: - suite = unittest.TestLoader().loadTestsFromName(__name__) - unittest.TextTestRunner(verbosity=3).run(suite) - else: - print "Test suite must be run with attached devices"