android_system_core/adb/device.py
Yasuhiro Matsuda ab3798399d Add a script to record Android boot time.
perfboot.py repeats the record of each event log during Android
boot specified times. By default, interval between measurements
is adjusted in such a way that CPUs are cooled down sufficiently
to avoid boot time slowdown caused by CPU thermal throttling.
This script also works around the issue of dropbox slowing down
boot time on userdebug build (http://b/20890386) by limiting
the number of files to be created by dropbox.
The result is output in a tab-separated value format.

BUG: 22207911
Change-Id: I0ddbac5d1c941efda87bc6db6388d8194d4bb3dd
2015-07-30 14:16:15 +09:00

241 lines
7.7 KiB
Python

#
# Copyright (C) 2015 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import logging
import os
import re
import subprocess
class FindDeviceError(RuntimeError):
pass
class DeviceNotFoundError(FindDeviceError):
def __init__(self, serial):
self.serial = serial
super(DeviceNotFoundError, self).__init__(
'No device with serial {}'.format(serial))
class NoUniqueDeviceError(FindDeviceError):
def __init__(self):
super(NoUniqueDeviceError, self).__init__('No unique device')
def get_devices():
with open(os.devnull, 'wb') as devnull:
subprocess.check_call(['adb', 'start-server'], stdout=devnull,
stderr=devnull)
out = subprocess.check_output(['adb', 'devices']).splitlines()
# The first line of `adb devices` just says "List of attached devices", so
# skip that.
devices = []
for line in out[1:]:
if not line.strip():
continue
if 'offline' in line:
continue
serial, _ = re.split(r'\s+', line, maxsplit=1)
devices.append(serial)
return devices
def _get_unique_device(product=None):
devices = get_devices()
if len(devices) != 1:
raise NoUniqueDeviceError()
return AndroidDevice(devices[0], product)
def _get_device_by_serial(serial, product=None):
for device in get_devices():
if device == serial:
return AndroidDevice(serial, product)
raise DeviceNotFoundError(serial)
def get_device(serial=None, product=None):
"""Get a uniquely identified AndroidDevice if one is available.
Raises:
DeviceNotFoundError:
The serial specified by `serial` or $ANDROID_SERIAL is not
connected.
NoUniqueDeviceError:
Neither `serial` nor $ANDROID_SERIAL was set, and the number of
devices connected to the system is not 1. Having 0 connected
devices will also result in this error.
Returns:
An AndroidDevice associated with the first non-None identifier in the
following order of preference:
1) The `serial` argument.
2) The environment variable $ANDROID_SERIAL.
3) The single device connnected to the system.
"""
if serial is not None:
return _get_device_by_serial(serial, product)
android_serial = os.getenv('ANDROID_SERIAL')
if android_serial is not None:
return _get_device_by_serial(android_serial, product)
return _get_unique_device(product)
class AndroidDevice(object):
def __init__(self, serial, product=None):
self.serial = serial
self.product = product
self.adb_cmd = ['adb']
if self.serial is not None:
self.adb_cmd.extend(['-s', serial])
if self.product is not None:
self.adb_cmd.extend(['-p', product])
self._linesep = None
self._shell_result_pattern = None
@property
def linesep(self):
if self._linesep is None:
self._linesep = subprocess.check_output(['adb', 'shell', 'echo'])
return self._linesep
def _make_shell_cmd(self, user_cmd):
# Follow any shell command with `; echo; echo $?` to get the exit
# status of a program since this isn't propagated by adb.
#
# The leading newline is needed because `printf 1; echo $?` would print
# "10", and we wouldn't be able to distinguish the exit code.
rc_probe = '; echo "\n$?"'
return self.adb_cmd + ['shell'] + user_cmd + [rc_probe]
def _parse_shell_output(self, out): # pylint: disable=no-self-use
search_text = out
max_result_len = len('{0}255{0}'.format(self.linesep))
if len(search_text) > max_result_len:
# We don't want to regex match over massive amounts of data when we
# know the part we want is right at the end.
search_text = search_text[-max_result_len:]
if self._shell_result_pattern is None:
self._shell_result_pattern = re.compile(
r'({0}\d+{0})$'.format(self.linesep), re.MULTILINE)
m = self._shell_result_pattern.search(search_text)
if m is None:
raise RuntimeError('Could not find exit status in shell output.')
result_text = m.group(1)
result = int(result_text.strip())
out = out[:-len(result_text)] # Trim the result text from the output.
return result, out
def _simple_call(self, cmd):
logging.info(' '.join(self.adb_cmd + cmd))
return subprocess.check_output(
self.adb_cmd + cmd, stderr=subprocess.STDOUT)
def shell(self, cmd):
logging.info(' '.join(self.adb_cmd + ['shell'] + cmd))
cmd = self._make_shell_cmd(cmd)
out = subprocess.check_output(cmd)
rc, out = self._parse_shell_output(out)
if rc != 0:
error = subprocess.CalledProcessError(rc, cmd)
error.out = out
raise error
return out
def shell_nocheck(self, cmd):
cmd = self._make_shell_cmd(cmd)
logging.info(' '.join(cmd))
p = subprocess.Popen(
cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
out, _ = p.communicate()
return self._parse_shell_output(out)
def install(self, filename):
return self._simple_call(['install', filename])
def push(self, local, remote):
return self._simple_call(['push', local, remote])
def pull(self, remote, local):
return self._simple_call(['pull', remote, local])
def sync(self, directory=None):
cmd = ['sync']
if directory is not None:
cmd.append(directory)
return self._simple_call(cmd)
def forward(self, local, remote):
return self._simple_call(['forward', local, remote])
def tcpip(self, port):
return self._simple_call(['tcpip', port])
def usb(self):
return self._simple_call(['usb'])
def reboot(self):
return self._simple_call(['reboot'])
def root(self):
return self._simple_call(['root'])
def unroot(self):
return self._simple_call(['unroot'])
def forward_remove(self, local):
return self._simple_call(['forward', '--remove', local])
def forward_remove_all(self):
return self._simple_call(['forward', '--remove-all'])
def connect(self, host):
return self._simple_call(['connect', host])
def disconnect(self, host):
return self._simple_call(['disconnect', host])
def reverse(self, remote, local):
return self._simple_call(['reverse', remote, local])
def reverse_remove_all(self):
return self._simple_call(['reverse', '--remove-all'])
def reverse_remove(self, remote):
return self._simple_call(['reverse', '--remove', remote])
def wait(self):
return self._simple_call(['wait-for-device'])
def get_prop(self, prop_name):
output = self.shell(['getprop', prop_name]).splitlines()
if len(output) != 1:
raise RuntimeError('Too many lines in getprop output:\n' +
'\n'.join(output))
value = output[0]
if not value.strip():
return None
return value
def set_prop(self, prop_name, value):
self.shell(['setprop', prop_name, value])