summaryrefslogtreecommitdiffstats
path: root/python-packages
diff options
context:
space:
mode:
authorJosh Gao <jmgao@google.com>2015-12-09 11:25:57 -0800
committerJosh Gao <jmgao@google.com>2015-12-09 11:25:57 -0800
commit5cd0f10ebc8582a3d404bb44b2fdc20bdf349295 (patch)
tree1849e70fef98ba412caac8b113a4b32f0c8f7758 /python-packages
parenta3a48fc60311d2d85139e328e7e4435eebe76046 (diff)
downloadandroid_development-5cd0f10ebc8582a3d404bb44b2fdc20bdf349295.tar.gz
android_development-5cd0f10ebc8582a3d404bb44b2fdc20bdf349295.tar.bz2
android_development-5cd0f10ebc8582a3d404bb44b2fdc20bdf349295.zip
Move adb tests to system/core/adb.
Change-Id: Ieeb0e1122a7b13a784ec8c19cf360c7699dcd9ff
Diffstat (limited to 'python-packages')
-rw-r--r--python-packages/adb/test_device.py948
1 files changed, 0 insertions, 948 deletions
diff --git a/python-packages/adb/test_device.py b/python-packages/adb/test_device.py
deleted file mode 100644
index 8d754687d..000000000
--- a/python-packages/adb/test_device.py
+++ /dev/null
@@ -1,948 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-#
-# Copyright (C) 2015 The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-from __future__ import print_function
-
-import contextlib
-import hashlib
-import os
-import posixpath
-import random
-import re
-import shlex
-import shutil
-import signal
-import socket
-import string
-import subprocess
-import sys
-import tempfile
-import unittest
-
-import mock
-
-import adb
-
-
-def requires_root(func):
- def wrapper(self, *args):
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
- if not was_root:
- self.device.root()
- self.device.wait()
-
- try:
- func(self, *args)
- finally:
- if not was_root:
- self.device.unroot()
- self.device.wait()
-
- return wrapper
-
-
-def requires_non_root(func):
- def wrapper(self, *args):
- was_root = self.device.shell(['id', '-un'])[0].strip() == 'root'
- if was_root:
- self.device.unroot()
- self.device.wait()
-
- try:
- func(self, *args)
- finally:
- if was_root:
- self.device.root()
- self.device.wait()
-
- return wrapper
-
-
-class GetDeviceTest(unittest.TestCase):
- def setUp(self):
- self.android_serial = os.getenv('ANDROID_SERIAL')
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- def tearDown(self):
- if self.android_serial is not None:
- os.environ['ANDROID_SERIAL'] = self.android_serial
- else:
- if 'ANDROID_SERIAL' in os.environ:
- del os.environ['ANDROID_SERIAL']
-
- @mock.patch('adb.device.get_devices')
- def test_explicit(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_from_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'foo'
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_arg_beats_env(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- os.environ['ANDROID_SERIAL'] = 'bar'
- device = adb.get_device('foo')
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_such_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device, ['baz'])
-
- os.environ['ANDROID_SERIAL'] = 'baz'
- self.assertRaises(adb.DeviceNotFoundError, adb.get_device)
-
- @mock.patch('adb.device.get_devices')
- def test_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo']
- device = adb.get_device()
- self.assertEqual(device.serial, 'foo')
-
- @mock.patch('adb.device.get_devices')
- def test_no_unique_device(self, mock_get_devices):
- mock_get_devices.return_value = ['foo', 'bar']
- self.assertRaises(adb.NoUniqueDeviceError, adb.get_device)
-
-
-class DeviceTest(unittest.TestCase):
- def setUp(self):
- self.device = adb.get_device()
-
-
-class ForwardReverseTest(DeviceTest):
- def _test_no_rebind(self, description, direction_list, direction,
- direction_no_rebind, direction_remove_all):
- msg = direction_list()
- self.assertEqual('', msg.strip(),
- description + ' list must be empty to run this test.')
-
- # Use --no-rebind with no existing binding
- direction_no_rebind('tcp:5566', 'tcp:6655')
- msg = direction_list()
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
-
- # Use --no-rebind with existing binding
- with self.assertRaises(subprocess.CalledProcessError):
- direction_no_rebind('tcp:5566', 'tcp:6677')
- msg = direction_list()
- self.assertFalse(re.search(r'tcp:5566.+tcp:6677', msg))
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
-
- # Use the absence of --no-rebind with existing binding
- direction('tcp:5566', 'tcp:6677')
- msg = direction_list()
- self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
- self.assertTrue(re.search(r'tcp:5566.+tcp:6677', msg))
-
- direction_remove_all()
- msg = direction_list()
- self.assertEqual('', msg.strip())
-
- def test_forward_no_rebind(self):
- self._test_no_rebind('forward', self.device.forward_list,
- self.device.forward, self.device.forward_no_rebind,
- self.device.forward_remove_all)
-
- def test_reverse_no_rebind(self):
- self._test_no_rebind('reverse', self.device.reverse_list,
- self.device.reverse, self.device.reverse_no_rebind,
- self.device.reverse_remove_all)
-
- def test_forward(self):
- msg = self.device.forward_list()
- self.assertEqual('', msg.strip(),
- 'Forwarding list must be empty to run this test.')
- self.device.forward('tcp:5566', 'tcp:6655')
- msg = self.device.forward_list()
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
- self.device.forward('tcp:7788', 'tcp:8877')
- msg = self.device.forward_list()
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
- self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
- self.device.forward_remove('tcp:5566')
- msg = self.device.forward_list()
- self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
- self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
- self.device.forward_remove_all()
- msg = self.device.forward_list()
- self.assertEqual('', msg.strip())
-
- def test_reverse(self):
- msg = self.device.reverse_list()
- self.assertEqual('', msg.strip(),
- 'Reverse forwarding list must be empty to run this test.')
- self.device.reverse('tcp:5566', 'tcp:6655')
- msg = self.device.reverse_list()
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
- self.device.reverse('tcp:7788', 'tcp:8877')
- msg = self.device.reverse_list()
- self.assertTrue(re.search(r'tcp:5566.+tcp:6655', msg))
- self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
- self.device.reverse_remove('tcp:5566')
- msg = self.device.reverse_list()
- self.assertFalse(re.search(r'tcp:5566.+tcp:6655', msg))
- self.assertTrue(re.search(r'tcp:7788.+tcp:8877', msg))
- self.device.reverse_remove_all()
- msg = self.device.reverse_list()
- self.assertEqual('', msg.strip())
-
- # Note: If you run this test when adb connect'd to a physical device over
- # TCP, it will fail in adb reverse due to https://code.google.com/p/android/issues/detail?id=189821
- def test_forward_reverse_echo(self):
- """Send data through adb forward and read it back via adb reverse"""
- forward_port = 12345
- reverse_port = forward_port + 1
- forward_spec = "tcp:" + str(forward_port)
- reverse_spec = "tcp:" + str(reverse_port)
- forward_setup = False
- reverse_setup = False
-
- try:
- # listen on localhost:forward_port, connect to remote:forward_port
- self.device.forward(forward_spec, forward_spec)
- forward_setup = True
- # listen on remote:forward_port, connect to localhost:reverse_port
- self.device.reverse(forward_spec, reverse_spec)
- reverse_setup = True
-
- listener = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- with contextlib.closing(listener):
- # Use SO_REUSEADDR so that subsequent runs of the test can grab
- # the port even if it is in TIME_WAIT.
- listener.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
-
- # Listen on localhost:reverse_port before connecting to
- # localhost:forward_port because that will cause adb to connect
- # back to localhost:reverse_port.
- listener.bind(('127.0.0.1', reverse_port))
- listener.listen(4)
-
- client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- with contextlib.closing(client):
- # Connect to the listener.
- client.connect(('127.0.0.1', forward_port))
-
- # Accept the client connection.
- accepted_connection, addr = listener.accept()
- with contextlib.closing(accepted_connection) as server:
- data = 'hello'
-
- # Send data into the port setup by adb forward.
- client.sendall(data)
- # Explicitly close() so that server gets EOF.
- client.close()
-
- # Verify that the data came back via adb reverse.
- self.assertEqual(data, server.makefile().read())
- finally:
- if reverse_setup:
- self.device.reverse_remove(forward_spec)
- if forward_setup:
- self.device.forward_remove(forward_spec)
-
-
-class ShellTest(DeviceTest):
- def _interactive_shell(self, shell_args, input):
- """Runs an interactive adb shell.
-
- Args:
- shell_args: List of string arguments to `adb shell`.
- input: String input to send to the interactive shell.
-
- Returns:
- The remote exit code.
-
- Raises:
- unittest.SkipTest: The device doesn't support exit codes.
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('exit codes are unavailable on this device')
-
- proc = subprocess.Popen(
- self.device.adb_cmd + ['shell'] + shell_args,
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- # Closing host-side stdin doesn't trigger a PTY shell to exit so we need
- # to explicitly add an exit command to close the session from the device
- # side, plus the necessary newline to complete the interactive command.
- proc.communicate(input + '; exit\n')
- return proc.returncode
-
- def test_cat(self):
- """Check that we can at least cat a file."""
- out = self.device.shell(['cat', '/proc/uptime'])[0].strip()
- elements = out.split()
- self.assertEqual(len(elements), 2)
-
- uptime, idle = elements
- self.assertGreater(float(uptime), 0.0)
- self.assertGreater(float(idle), 0.0)
-
- def test_throws_on_failure(self):
- self.assertRaises(adb.ShellError, self.device.shell, ['false'])
-
- def test_output_not_stripped(self):
- out = self.device.shell(['echo', 'foo'])[0]
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_shell_nocheck_failure(self):
- rc, out, _ = self.device.shell_nocheck(['false'])
- self.assertNotEqual(rc, 0)
- self.assertEqual(out, '')
-
- def test_shell_nocheck_output_not_stripped(self):
- rc, out, _ = self.device.shell_nocheck(['echo', 'foo'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, 'foo' + self.device.linesep)
-
- def test_can_distinguish_tricky_results(self):
- # If result checking on ADB shell is naively implemented as
- # `adb shell <cmd>; echo $?`, we would be unable to distinguish the
- # output from the result for a cmd of `echo -n 1`.
- rc, out, _ = self.device.shell_nocheck(['echo', '-n', '1'])
- self.assertEqual(rc, 0)
- self.assertEqual(out, '1')
-
- def test_line_endings(self):
- """Ensure that line ending translation is not happening in the pty.
-
- Bug: http://b/19735063
- """
- output = self.device.shell(['uname'])[0]
- self.assertEqual(output, 'Linux' + self.device.linesep)
-
- def test_pty_logic(self):
- """Tests that a PTY is allocated when it should be.
-
- PTY allocation behavior should match ssh; some behavior requires
- a terminal stdin to test so this test will be skipped if stdin
- is not a terminal.
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('PTY arguments unsupported on this device')
- if not os.isatty(sys.stdin.fileno()):
- raise unittest.SkipTest('PTY tests require stdin terminal')
-
- def check_pty(args):
- """Checks adb shell PTY allocation.
-
- Tests |args| for terminal and non-terminal stdin.
-
- Args:
- args: -Tt args in a list (e.g. ['-t', '-t']).
-
- Returns:
- A tuple (<terminal>, <non-terminal>). True indicates
- the corresponding shell allocated a remote PTY.
- """
- test_cmd = self.device.adb_cmd + ['shell'] + args + ['[ -t 0 ]']
-
- terminal = subprocess.Popen(
- test_cmd, stdin=None,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- terminal.communicate()
-
- non_terminal = subprocess.Popen(
- test_cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE)
- non_terminal.communicate()
-
- return (terminal.returncode == 0, non_terminal.returncode == 0)
-
- # -T: never allocate PTY.
- self.assertEqual((False, False), check_pty(['-T']))
-
- # No args: PTY only if stdin is a terminal and shell is interactive,
- # which is difficult to reliably test from a script.
- self.assertEqual((False, False), check_pty([]))
-
- # -t: PTY if stdin is a terminal.
- self.assertEqual((True, False), check_pty(['-t']))
-
- # -t -t: always allocate PTY.
- self.assertEqual((True, True), check_pty(['-t', '-t']))
-
- def test_shell_protocol(self):
- """Tests the shell protocol on the device.
-
- If the device supports shell protocol, this gives us the ability
- to separate stdout/stderr and return the exit code directly.
-
- Bug: http://b/19734861
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
-
- # Shell protocol should be used by default.
- result = self.device.shell_nocheck(
- shlex.split('echo foo; echo bar >&2; exit 17'))
- self.assertEqual(17, result[0])
- self.assertEqual('foo' + self.device.linesep, result[1])
- self.assertEqual('bar' + self.device.linesep, result[2])
-
- self.assertEqual(17, self._interactive_shell([], 'exit 17'))
-
- # -x flag should disable shell protocol.
- result = self.device.shell_nocheck(
- shlex.split('-x echo foo; echo bar >&2; exit 17'))
- self.assertEqual(0, result[0])
- self.assertEqual('foo{0}bar{0}'.format(self.device.linesep), result[1])
- self.assertEqual('', result[2])
-
- self.assertEqual(0, self._interactive_shell(['-x'], 'exit 17'))
-
- def test_non_interactive_sigint(self):
- """Tests that SIGINT in a non-interactive shell kills the process.
-
- This requires the shell protocol in order to detect the broken
- pipe; raw data transfer mode will only see the break once the
- subprocess tries to read or write.
-
- Bug: http://b/23825725
- """
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('shell protocol unsupported on this device')
-
- # Start a long-running process.
- sleep_proc = subprocess.Popen(
- self.device.adb_cmd + shlex.split('shell echo $$; sleep 60'),
- stdin=subprocess.PIPE, stdout=subprocess.PIPE,
- stderr=subprocess.STDOUT)
- remote_pid = sleep_proc.stdout.readline().strip()
- self.assertIsNone(sleep_proc.returncode, 'subprocess terminated early')
- proc_query = shlex.split('ps {0} | grep {0}'.format(remote_pid))
-
- # Verify that the process is running, send signal, verify it stopped.
- self.device.shell(proc_query)
- os.kill(sleep_proc.pid, signal.SIGINT)
- sleep_proc.communicate()
- self.assertEqual(1, self.device.shell_nocheck(proc_query)[0],
- 'subprocess failed to terminate')
-
- def test_non_interactive_stdin(self):
- """Tests that non-interactive shells send stdin."""
- if self.device.SHELL_PROTOCOL_FEATURE not in self.device.features:
- raise unittest.SkipTest('non-interactive stdin unsupported '
- 'on this device')
-
- # Test both small and large inputs.
- small_input = 'foo'
- large_input = '\n'.join(c * 100 for c in (string.ascii_letters +
- string.digits))
-
- for input in (small_input, large_input):
- proc = subprocess.Popen(self.device.adb_cmd + ['shell', 'cat'],
- stdin=subprocess.PIPE,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- stdout, stderr = proc.communicate(input)
- self.assertEqual(input.splitlines(), stdout.splitlines())
- self.assertEqual('', stderr)
-
-
-class ArgumentEscapingTest(DeviceTest):
- def test_shell_escaping(self):
- """Make sure that argument escaping is somewhat sane."""
-
- # http://b/19734868
- # Note that this actually matches ssh(1)'s behavior --- it's
- # converted to `sh -c echo hello; echo world` which sh interprets
- # as `sh -c echo` (with an argument to that shell of "hello"),
- # and then `echo world` back in the first shell.
- result = self.device.shell(
- shlex.split("sh -c 'echo hello; echo world'"))[0]
- result = result.splitlines()
- self.assertEqual(['', 'world'], result)
- # If you really wanted "hello" and "world", here's what you'd do:
- result = self.device.shell(
- shlex.split(r'echo hello\;echo world'))[0].splitlines()
- self.assertEqual(['hello', 'world'], result)
-
- # http://b/15479704
- result = self.device.shell(shlex.split("'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split("sh -c 'true && echo t'"))[0].strip()
- self.assertEqual('t', result)
-
- # http://b/20564385
- result = self.device.shell(shlex.split('FOO=a BAR=b echo t'))[0].strip()
- self.assertEqual('t', result)
- result = self.device.shell(
- shlex.split(r'echo -n 123\;uname'))[0].strip()
- self.assertEqual('123Linux', result)
-
- def test_install_argument_escaping(self):
- """Make sure that install argument escaping works."""
- # http://b/20323053, http://b/3090932.
- for file_suffix in ('-text;ls;1.apk', "-Live Hold'em.apk"):
- tf = tempfile.NamedTemporaryFile('wb', suffix=file_suffix,
- delete=False)
- tf.close()
-
- # Installing bogus .apks fails if the device supports exit codes.
- try:
- output = self.device.install(tf.name)
- except subprocess.CalledProcessError as e:
- output = e.output
-
- self.assertIn(file_suffix, output)
- os.remove(tf.name)
-
-
-class RootUnrootTest(DeviceTest):
- def _test_root(self):
- message = self.device.root()
- if 'adbd cannot run as root in production builds' in message:
- return
- self.device.wait()
- self.assertEqual('root', self.device.shell(['id', '-un'])[0].strip())
-
- def _test_unroot(self):
- self.device.unroot()
- self.device.wait()
- self.assertEqual('shell', self.device.shell(['id', '-un'])[0].strip())
-
- def test_root_unroot(self):
- """Make sure that adb root and adb unroot work, using id(1)."""
- if self.device.get_prop('ro.debuggable') != '1':
- raise unittest.SkipTest('requires rootable build')
-
- original_user = self.device.shell(['id', '-un'])[0].strip()
- try:
- if original_user == 'root':
- self._test_unroot()
- self._test_root()
- elif original_user == 'shell':
- self._test_root()
- self._test_unroot()
- finally:
- if original_user == 'root':
- self.device.root()
- else:
- self.device.unroot()
- self.device.wait()
-
-
-class TcpIpTest(DeviceTest):
- def test_tcpip_failure_raises(self):
- """adb tcpip requires a port.
-
- Bug: http://b/22636927
- """
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, '')
- self.assertRaises(
- subprocess.CalledProcessError, self.device.tcpip, 'foo')
-
-
-class SystemPropertiesTest(DeviceTest):
- def test_get_prop(self):
- self.assertEqual(self.device.get_prop('init.svc.adbd'), 'running')
-
- @requires_root
- def test_set_prop(self):
- prop_name = 'foo.bar'
- self.device.shell(['setprop', prop_name, '""'])
-
- self.device.set_prop(prop_name, 'qux')
- self.assertEqual(
- self.device.shell(['getprop', prop_name])[0].strip(), 'qux')
-
-
-def compute_md5(string):
- hsh = hashlib.md5()
- hsh.update(string)
- return hsh.hexdigest()
-
-
-def get_md5_prog(device):
- """Older platforms (pre-L) had the name md5 rather than md5sum."""
- try:
- device.shell(['md5sum', '/proc/uptime'])
- return 'md5sum'
- except adb.ShellError:
- return 'md5'
-
-
-class HostFile(object):
- def __init__(self, handle, checksum):
- self.handle = handle
- self.checksum = checksum
- self.full_path = handle.name
- self.base_name = os.path.basename(self.full_path)
-
-
-class DeviceFile(object):
- def __init__(self, checksum, full_path):
- self.checksum = checksum
- self.full_path = full_path
- self.base_name = posixpath.basename(self.full_path)
-
-
-def make_random_host_files(in_dir, num_files):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for _ in xrange(num_files):
- file_handle = tempfile.NamedTemporaryFile(dir=in_dir, delete=False)
-
- size = random.randrange(min_size, max_size, 1024)
- rand_str = os.urandom(size)
- file_handle.write(rand_str)
- file_handle.flush()
- file_handle.close()
-
- md5 = compute_md5(rand_str)
- files.append(HostFile(file_handle, md5))
- return files
-
-
-def make_random_device_files(device, in_dir, num_files, prefix='device_tmpfile'):
- min_size = 1 * (1 << 10)
- max_size = 16 * (1 << 10)
-
- files = []
- for file_num in xrange(num_files):
- size = random.randrange(min_size, max_size, 1024)
-
- base_name = prefix + str(file_num)
- full_path = posixpath.join(in_dir, base_name)
-
- device.shell(['dd', 'if=/dev/urandom', 'of={}'.format(full_path),
- 'bs={}'.format(size), 'count=1'])
- dev_md5, _ = device.shell([get_md5_prog(device), full_path])[0].split()
-
- files.append(DeviceFile(dev_md5, full_path))
- return files
-
-
-class FileOperationsTest(DeviceTest):
- SCRATCH_DIR = '/data/local/tmp'
- DEVICE_TEMP_FILE = SCRATCH_DIR + '/adb_test_file'
- DEVICE_TEMP_DIR = SCRATCH_DIR + '/adb_test_dir'
-
- def _verify_remote(self, checksum, remote_path):
- dev_md5, _ = self.device.shell([get_md5_prog(self.device),
- remote_path])[0].split()
- self.assertEqual(checksum, dev_md5)
-
- def _verify_local(self, checksum, local_path):
- with open(local_path, 'rb') as host_file:
- host_md5 = compute_md5(host_file.read())
- self.assertEqual(host_md5, checksum)
-
- def test_push(self):
- """Push a randomly generated file to specified device."""
- kbytes = 512
- tmp = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- rand_str = os.urandom(1024 * kbytes)
- tmp.write(rand_str)
- tmp.close()
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- self.device.push(local=tmp.name, remote=self.DEVICE_TEMP_FILE)
-
- self._verify_remote(compute_md5(rand_str), self.DEVICE_TEMP_FILE)
- self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
-
- os.remove(tmp.name)
-
- def test_push_dir(self):
- """Push a randomly generated directory of files to the device."""
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
-
- try:
- host_dir = tempfile.mkdtemp()
-
- # Make sure the temp directory isn't setuid, or else adb will complain.
- os.chmod(host_dir, 0o700)
-
- # Create 32 random files.
- temp_files = make_random_host_files(in_dir=host_dir, num_files=32)
- self.device.push(host_dir, self.DEVICE_TEMP_DIR)
-
- for temp_file in temp_files:
- remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
- os.path.basename(host_dir),
- temp_file.base_name)
- self._verify_remote(temp_file.checksum, remote_path)
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- @unittest.expectedFailure # b/25566053
- def test_push_empty(self):
- """Push a directory containing an empty directory to the device."""
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
-
- try:
- host_dir = tempfile.mkdtemp()
-
- # Make sure the temp directory isn't setuid, or else adb will complain.
- os.chmod(host_dir, 0o700)
-
- # Create an empty directory.
- os.mkdir(os.path.join(host_dir, 'empty'))
-
- self.device.push(host_dir, self.DEVICE_TEMP_DIR)
-
- test_empty_cmd = ['[', '-d',
- os.path.join(self.DEVICE_TEMP_DIR, 'empty')]
- rc, _, _ = self.device.shell_nocheck(test_empty_cmd)
- self.assertEqual(rc, 0)
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_multiple_push(self):
- """Push multiple files to the device in one adb push command.
-
- Bug: http://b/25324823
- """
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', self.DEVICE_TEMP_DIR])
-
- try:
- host_dir = tempfile.mkdtemp()
-
- # Create some random files and a subdirectory containing more files.
- temp_files = make_random_host_files(in_dir=host_dir, num_files=4)
-
- subdir = os.path.join(host_dir, "subdir")
- os.mkdir(subdir)
- subdir_temp_files = make_random_host_files(in_dir=subdir,
- num_files=4)
-
- paths = map(lambda temp_file: temp_file.full_path, temp_files)
- paths.append(subdir)
- self.device._simple_call(['push'] + paths + [self.DEVICE_TEMP_DIR])
-
- for temp_file in temp_files:
- remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
- temp_file.base_name)
- self._verify_remote(temp_file.checksum, remote_path)
-
- for subdir_temp_file in subdir_temp_files:
- remote_path = posixpath.join(self.DEVICE_TEMP_DIR,
- # BROKEN: http://b/25394682
- # "subdir",
- temp_file.base_name)
- self._verify_remote(temp_file.checksum, remote_path)
-
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
-
- def _test_pull(self, remote_file, checksum):
- tmp_write = tempfile.NamedTemporaryFile(mode='wb', delete=False)
- tmp_write.close()
- self.device.pull(remote=remote_file, local=tmp_write.name)
- with open(tmp_write.name, 'rb') as tmp_read:
- host_contents = tmp_read.read()
- host_md5 = compute_md5(host_contents)
- self.assertEqual(checksum, host_md5)
- os.remove(tmp_write.name)
-
- @requires_non_root
- def test_pull_error_reporting(self):
- self.device.shell(['touch', self.DEVICE_TEMP_FILE])
- self.device.shell(['chmod', 'a-rwx', self.DEVICE_TEMP_FILE])
-
- try:
- output = self.device.pull(remote=self.DEVICE_TEMP_FILE, local='x')
- except subprocess.CalledProcessError as e:
- output = e.output
-
- self.assertIn('Permission denied', output)
-
- self.device.shell(['rm', '-f', self.DEVICE_TEMP_FILE])
-
- def test_pull(self):
- """Pull a randomly generated file from specified device."""
- kbytes = 512
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_FILE])
- cmd = ['dd', 'if=/dev/urandom',
- 'of={}'.format(self.DEVICE_TEMP_FILE), 'bs=1024',
- 'count={}'.format(kbytes)]
- self.device.shell(cmd)
- dev_md5, _ = self.device.shell(
- [get_md5_prog(self.device), self.DEVICE_TEMP_FILE])[0].split()
- self._test_pull(self.DEVICE_TEMP_FILE, dev_md5)
- self.device.shell_nocheck(['rm', self.DEVICE_TEMP_FILE])
-
- def test_pull_dir(self):
- """Pull a randomly generated directory of files from the device."""
- try:
- host_dir = tempfile.mkdtemp()
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', '-p', self.DEVICE_TEMP_DIR])
-
- # Populate device directory with random files.
- temp_files = make_random_device_files(
- self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=32)
-
- self.device.pull(remote=self.DEVICE_TEMP_DIR, local=host_dir)
-
- for temp_file in temp_files:
- host_path = os.path.join(host_dir, temp_file.base_name)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_pull_empty(self):
- """Pull a directory containing an empty directory from the device."""
- try:
- host_dir = tempfile.mkdtemp()
-
- remote_empty_path = posixpath.join(self.DEVICE_TEMP_DIR, 'empty')
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', '-p', remote_empty_path])
-
- self.device.pull(remote=remote_empty_path, local=host_dir)
- self.assertTrue(os.path.isdir(os.path.join(host_dir, 'empty')))
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_multiple_pull(self):
- """Pull a randomly generated directory of files from the device."""
-
- try:
- host_dir = tempfile.mkdtemp()
-
- subdir = posixpath.join(self.DEVICE_TEMP_DIR, "subdir")
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- self.device.shell(['mkdir', '-p', subdir])
-
- # Create some random files and a subdirectory containing more files.
- temp_files = make_random_device_files(
- self.device, in_dir=self.DEVICE_TEMP_DIR, num_files=4)
-
- subdir_temp_files = make_random_device_files(
- self.device, in_dir=subdir, num_files=4, prefix='subdir_')
-
- paths = map(lambda temp_file: temp_file.full_path, temp_files)
- paths.append(subdir)
- self.device._simple_call(['pull'] + paths + [host_dir])
-
- for temp_file in temp_files:
- local_path = os.path.join(host_dir, temp_file.base_name)
- self._verify_local(temp_file.checksum, local_path)
-
- for subdir_temp_file in subdir_temp_files:
- local_path = os.path.join(host_dir,
- "subdir",
- subdir_temp_file.base_name)
- self._verify_local(subdir_temp_file.checksum, local_path)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if host_dir is not None:
- shutil.rmtree(host_dir)
-
- def test_sync(self):
- """Sync a randomly generated directory of files to specified device."""
-
- try:
- base_dir = tempfile.mkdtemp()
-
- # Create mirror device directory hierarchy within base_dir.
- full_dir_path = base_dir + self.DEVICE_TEMP_DIR
- os.makedirs(full_dir_path)
-
- # Create 32 random files within the host mirror.
- temp_files = make_random_host_files(in_dir=full_dir_path, num_files=32)
-
- # Clean up any trash on the device.
- device = adb.get_device(product=base_dir)
- device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
-
- device.sync('data')
-
- # Confirm that every file on the device mirrors that on the host.
- for temp_file in temp_files:
- device_full_path = posixpath.join(self.DEVICE_TEMP_DIR,
- temp_file.base_name)
- dev_md5, _ = device.shell(
- [get_md5_prog(self.device), device_full_path])[0].split()
- self.assertEqual(temp_file.checksum, dev_md5)
-
- self.device.shell(['rm', '-rf', self.DEVICE_TEMP_DIR])
- finally:
- if base_dir is not None:
- shutil.rmtree(base_dir)
-
- def test_unicode_paths(self):
- """Ensure that we can support non-ASCII paths, even on Windows."""
- name = u'로보카 폴리'
-
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
- remote_path = u'/data/local/tmp/adb-test-{}'.format(name)
-
- ## push.
- tf = tempfile.NamedTemporaryFile('wb', suffix=name, delete=False)
- tf.close()
- self.device.push(tf.name, remote_path)
- os.remove(tf.name)
- self.assertFalse(os.path.exists(tf.name))
-
- # Verify that the device ended up with the expected UTF-8 path
- output = self.device.shell(
- ['ls', '/data/local/tmp/adb-test-*'])[0].strip()
- self.assertEqual(remote_path.encode('utf-8'), output)
-
- # pull.
- self.device.pull(remote_path, tf.name)
- self.assertTrue(os.path.exists(tf.name))
- os.remove(tf.name)
- self.device.shell(['rm', '-f', '/data/local/tmp/adb-test-*'])
-
-
-def main():
- random.seed(0)
- if len(adb.get_devices()) > 0:
- suite = unittest.TestLoader().loadTestsFromName(__name__)
- unittest.TextTestRunner(verbosity=3).run(suite)
- else:
- print('Test suite must be run with attached devices')
-
-
-if __name__ == '__main__':
- main()