diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2017-11-28 23:26:15 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2017-11-28 23:26:15 +0000 |
commit | fbf75255e864517a82e4a3a0c15a511d396f4db8 (patch) | |
tree | fe7f6f6171d4fdd6ad6b93a78248dcd4ac638525 | |
parent | 060146a4e403ebe7dc0109c5795753b3656f26cd (diff) | |
parent | 91b8da9d0822fc0610deadc9d6e9e0a9482e3e14 (diff) | |
download | platform_tools_test_connectivity-oreo-m5-release.tar.gz platform_tools_test_connectivity-oreo-m5-release.tar.bz2 platform_tools_test_connectivity-oreo-m5-release.zip |
Snap for 4448085 from 91b8da9d0822fc0610deadc9d6e9e0a9482e3e14 to oc-m3-releaseandroid-8.1.0_r9android-8.1.0_r7android-8.1.0_r22android-8.1.0_r21android-8.1.0_r18android-8.1.0_r17android-8.1.0_r14android-8.1.0_r13oreo-m5-releaseoreo-m3-release
Change-Id: I5b74e00606a155163bf98c601fe32cedac88879d
70 files changed, 5453 insertions, 552 deletions
diff --git a/acts/framework/acts/config_parser.py b/acts/framework/acts/config_parser.py index 54d15f56c9..57c3a5c7ae 100755 --- a/acts/framework/acts/config_parser.py +++ b/acts/framework/acts/config_parser.py @@ -68,39 +68,11 @@ def _validate_testbed_name(name): "Char '%s' is not allowed in test bed names." % l) -def _update_file_paths(config, config_path): - """ Checks if the path entries are valid. - - If the file path is invaild, assume it is a relative path and append - that to the config file path. - - Args: - config : the config object to verify. - config_path : The path to the config file, which can be used to - generate absolute paths from relative paths in configs. - - Raises: - If the file path is invalid, ActsConfigError is raised. - """ - # Check the file_path_keys and update if it is a relative path. - for file_path_key in keys.Config.file_path_keys.value: - if file_path_key in config: - config_file = config[file_path_key] - if not os.path.isfile(config_file): - config_file = os.path.join(config_path, config_file) - if not os.path.isfile(config_file): - raise ActsConfigError("Unable to load config %s from test " - "config file.", config_file) - config[file_path_key] = config_file - - -def _validate_testbed_configs(testbed_configs, config_path): +def _validate_testbed_configs(testbed_configs): """Validates the testbed configurations. Args: testbed_configs: A list of testbed configuration json objects. - config_path : The path to the config file, which can be used to - generate absolute paths from relative paths in configs. Raises: If any part of the configuration is invalid, ActsConfigError is raised. @@ -108,7 +80,6 @@ def _validate_testbed_configs(testbed_configs, config_path): seen_names = set() # Cross checks testbed configs for resource conflicts. for config in testbed_configs: - _update_file_paths(config, config_path) # Check for conflicts between multiple concurrent testbed configs. # No need to call it if there's only one testbed config. name = config[keys.Config.key_testbed_name.value] @@ -288,20 +259,19 @@ def load_test_config_file(test_config_path, configs[keys.Config.key_test_paths.value] = os.environ[ _ENV_ACTS_TESTPATHS].split(_PATH_SEPARATOR) - # Add the global paths to the global config. + _validate_test_config(configs) + _validate_testbed_configs(configs[keys.Config.key_testbed.value]) k_log_path = keys.Config.key_log_path.value configs[k_log_path] = utils.abs_path(configs[k_log_path]) - - # TODO: See if there is a better way to do this: b/29836695 config_path, _ = os.path.split(utils.abs_path(test_config_path)) configs[keys.Config.key_config_path] = config_path - _validate_test_config(configs) - _validate_testbed_configs(configs[keys.Config.key_testbed.value], - config_path) + tps = configs[keys.Config.key_test_paths.value] # Unpack testbeds into separate json objects. beds = configs.pop(keys.Config.key_testbed.value) config_jsons = [] - + # TODO: See if there is a better way to do this: b/29836695 + config_path, _ = os.path.split(utils.abs_path(test_config_path)) + configs[keys.Config.key_config_path] = config_path for original_bed_config in beds: new_test_config = dict(configs) new_test_config[keys.Config.key_testbed.value] = original_bed_config diff --git a/acts/framework/acts/controllers/__init__.py b/acts/framework/acts/controllers/__init__.py index d0e411c536..2cc6f43fa1 100644 --- a/acts/framework/acts/controllers/__init__.py +++ b/acts/framework/acts/controllers/__init__.py @@ -24,5 +24,6 @@ def destroy(objs): """ """This is a list of all the top level controller modules""" __all__ = [ - "android_device", "attenuator", "monsoon", "access_point", "iperf_server" + "android_device", "attenuator", "monsoon", "access_point", "iperf_server", + "packet_sender" ] diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py index fabfd404e7..a8ea506161 100755 --- a/acts/framework/acts/controllers/access_point.py +++ b/acts/framework/acts/controllers/access_point.py @@ -16,8 +16,8 @@ import collections import ipaddress -import logging +from acts.controllers.ap_lib import bridge_interface from acts.controllers.ap_lib import dhcp_config from acts.controllers.ap_lib import dhcp_server from acts.controllers.ap_lib import hostapd @@ -90,6 +90,9 @@ _AP_2GHZ_SUBNET_STR = '192.168.1.0/24' _AP_5GHZ_SUBNET_STR = '192.168.9.0/24' _AP_2GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_2GHZ_SUBNET_STR)) _AP_5GHZ_SUBNET = dhcp_config.Subnet(ipaddress.ip_network(_AP_5GHZ_SUBNET_STR)) +LAN_INTERFACE = 'eth1' +# The last digit of the ip for the bridge interface +BRIDGE_IP_LAST = '100' class AccessPoint(object): @@ -116,6 +119,7 @@ class AccessPoint(object): # A map from network interface name to _ApInstance objects representing # the hostapd instance running against the interface. self._aps = dict() + self.bridge = bridge_interface.BridgeInterface(self.ssh) def start_ap(self, hostapd_config, additional_parameters=None): """Starts as an ap using a set of configurations. @@ -269,7 +273,7 @@ class AccessPoint(object): identifier: The identify of the ap that should be taken down. """ - if identifier not in self._aps: + if identifier not in list(self._aps.keys()): raise ValueError('Invalid identifer %s given' % identifier) instance = self._aps.get(identifier) @@ -283,13 +287,14 @@ class AccessPoint(object): # then an exception gets thrown. We need to catch this exception and # check that all interfaces should actually be down. configured_subnets = [x.subnet for x in self._aps.values()] + del self._aps[identifier] if configured_subnets: self._dhcp.start(dhcp_config.DhcpConfig(configured_subnets)) def stop_all_aps(self): """Stops all running aps on this device.""" - for ap in self._aps.keys(): + for ap in list(self._aps.keys()): try: self.stop_ap(ap) except dhcp_server.NoInterfaceError as e: @@ -305,3 +310,29 @@ class AccessPoint(object): if self._aps: self.stop_all_aps() self.ssh.close() + + def generate_bridge_configs(self, channel, iface_lan=LAN_INTERFACE): + """Generate a list of configs for a bridge between LAN and WLAN. + + Args: + channel: the channel WLAN interface is brought up on + iface_lan: the LAN interface to bridge + Returns: + configs: tuple containing iface_wlan, iface_lan and bridge_ip + """ + + if channel < 15: + iface_wlan = _AP_2GHZ_INTERFACE + subnet_str = _AP_2GHZ_SUBNET_STR + else: + iface_wlan = _AP_5GHZ_INTERFACE + subnet_str = _AP_5GHZ_SUBNET_STR + + iface_lan = iface_lan + + a, b, c, d = subnet_str.strip('/24').split('.') + bridge_ip = "%s.%s.%s.%s" % (a, b, c, BRIDGE_IP_LAST) + + configs = (iface_wlan, iface_lan, bridge_ip) + + return configs diff --git a/acts/framework/acts/controllers/adb.py b/acts/framework/acts/controllers/adb.py index c8aef7e0e9..0c753cb095 100644 --- a/acts/framework/acts/controllers/adb.py +++ b/acts/framework/acts/controllers/adb.py @@ -139,6 +139,19 @@ class AdbProxy(object): return self._exec_cmd(' '.join((self.adb_str, name, arg_str)), **kwargs) + def _exec_cmd_nb(self, cmd): + """Executes adb commands in a new shell, non blocking. + + Args: + cmds: A string that is the adb command to execute. + + """ + job.run_async(cmd) + + def _exec_adb_cmd_nb(self, name, arg_str, **kwargs): + return self._exec_cmd_nb(' '.join((self.adb_str, name, arg_str)), + **kwargs) + def tcp_forward(self, host_port, device_port): """Starts tcp forwarding from localhost to this android device. @@ -196,6 +209,9 @@ class AdbProxy(object): ignore_status=ignore_status, timeout=timeout) + def shell_nb(self, command): + return self._exec_adb_cmd_nb('shell', shellescape.quote(command)) + def pull(self, command, ignore_status=False, diff --git a/acts/framework/acts/controllers/ap_lib/bridge_interface.py b/acts/framework/acts/controllers/ap_lib/bridge_interface.py new file mode 100644 index 0000000000..af3d072cd3 --- /dev/null +++ b/acts/framework/acts/controllers/ap_lib/bridge_interface.py @@ -0,0 +1,123 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time +from acts.libs.proc import job + +# TODO(@qijiang): will change to brctl when it's built in image +_BRCTL = '/home/root/bridge-utils/sbin/brctl' +BRIDGE_NAME = 'br0' +CREATE_BRIDGE = '%s addbr %s' % (_BRCTL, BRIDGE_NAME) +DELETE_BRIDGE = '%s delbr %s' % (_BRCTL, BRIDGE_NAME) +BRING_DOWN_BRIDGE = 'ifconfig %s down' % BRIDGE_NAME + + +class BridgeInterfaceConfigs(object): + """Configs needed for creating bridge interface between LAN and WLAN. + + """ + + def __init__(self, iface_wlan, iface_lan, bridge_ip): + """Set bridge interface configs based on the channel info. + + Args: + iface_wlan: the wlan interface as part of the bridge + iface_lan: the ethernet LAN interface as part of the bridge + bridge_ip: the ip address assigned to the bridge interface + """ + self.iface_wlan = iface_wlan + self.iface_lan = iface_lan + self.bridge_ip = bridge_ip + + +class BridgeInterface(object): + """Class object for bridge interface betwen WLAN and LAN + + """ + + def __init__(self, ssh_session): + """Initialize the BridgeInterface class. + + Bridge interface will be added between ethernet LAN port and WLAN port. + Args: + ssh_session: ssh session to the AP + """ + self.ssh = ssh_session + self.log = logging.getLogger() + + def startup(self, brconfigs): + """Start up the bridge interface. + + Args: + brconfigs: the bridge interface config, type BridgeInterfaceConfigs + """ + + self.log.info('Create bridge interface between LAN and WLAN') + # Create the bridge + try: + self.ssh.run(CREATE_BRIDGE) + except job.Error: + self.log.warning( + 'Bridge interface {} already exists, no action needed'.format( + BRIDGE_NAME)) + + # Enable 4addr mode on for the wlan interface + ENABLE_4ADDR = 'iw dev %s set 4addr on' % (brconfigs.iface_wlan) + try: + self.ssh.run(ENABLE_4ADDR) + except job.Error: + self.log.warning( + '4addr is already enabled on {}'.format(brconfigs.iface_wlan)) + + # Add both LAN and WLAN interfaces to the bridge interface + for interface in [brconfigs.iface_lan, brconfigs.iface_wlan]: + ADD_INTERFACE = '%s addif %s %s' % (_BRCTL, BRIDGE_NAME, interface) + try: + self.ssh.run(ADD_INTERFACE) + except job.Error: + self.log.warning('{} has alrady been added to {}'.format( + interface, BRIDGE_NAME)) + time.sleep(5) + + # Set IP address on the bridge interface to bring it up + SET_BRIDGE_IP = 'ifconfig %s %s' % (BRIDGE_NAME, brconfigs.bridge_ip) + self.ssh.run(SET_BRIDGE_IP) + time.sleep(2) + + # Bridge interface is up + self.log.info('Bridge interface is up and running') + + def teardown(self, brconfigs): + """Tear down the bridge interface. + + Args: + brconfigs: the bridge interface config, type BridgeInterfaceConfigs + """ + self.log.info('Bringing down the bridge interface') + # Delete the bridge interface + self.ssh.run(BRING_DOWN_BRIDGE) + time.sleep(1) + self.ssh.run(DELETE_BRIDGE) + + # Bring down wlan interface and disable 4addr mode + BRING_DOWN_WLAN = 'ifconfig %s down' % brconfigs.iface_wlan + self.ssh.run(BRING_DOWN_WLAN) + time.sleep(2) + DISABLE_4ADDR = 'iw dev %s set 4addr off' % (brconfigs.iface_wlan) + self.ssh.run(DISABLE_4ADDR) + time.sleep(1) + self.log.info('Bridge interface is down') diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_security.py b/acts/framework/acts/controllers/ap_lib/hostapd_security.py index 4e1ae3a2fc..9733e99913 100644 --- a/acts/framework/acts/controllers/ap_lib/hostapd_security.py +++ b/acts/framework/acts/controllers/ap_lib/hostapd_security.py @@ -64,14 +64,15 @@ class Security(object): else: security_mode = None self.security_mode = security_mode - if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len( - password) > hostapd_constants.MAX_WPA_PSK_LENGTH: - raise ValueError( - 'Password must be a minumum of %s characters and a maximum of %s' - % (hostapd_constants.MIN_WPA_PSK_LENGTH, - hostapd_constants.MAX_WPA_PSK_LENGTH)) - else: - self.password = password + if password: + if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len( + password) > hostapd_constants.MAX_WPA_PSK_LENGTH: + raise ValueError( + 'Password must be a minumum of %s characters and a maximum of %s' + % (hostapd_constants.MIN_WPA_PSK_LENGTH, + hostapd_constants.MAX_WPA_PSK_LENGTH)) + else: + self.password = password def generate_dict(self): """Returns: an ordered dictionary of settings""" diff --git a/acts/framework/acts/controllers/monsoon.py b/acts/framework/acts/controllers/monsoon.py index d573afc66a..e31261af1b 100644 --- a/acts/framework/acts/controllers/monsoon.py +++ b/acts/framework/acts/controllers/monsoon.py @@ -196,8 +196,8 @@ class MonsoonProxy(object): logging.warning("Wanted status, dropped type=0x%02x, len=%d", read_bytes[0], len(read_bytes)) continue - status = dict(zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, - read_bytes))) + status = dict( + zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, read_bytes))) p_type = status["packetType"] if p_type != 0x10: raise MonsoonError("Package type %s is not 0x10." % p_type) @@ -300,8 +300,10 @@ class MonsoonProxy(object): continue seq, _type, x, y = struct.unpack("BBBB", _bytes[:4]) - data = [struct.unpack(">hhhh", _bytes[x:x + 8]) - for x in range(4, len(_bytes) - 8, 8)] + data = [ + struct.unpack(">hhhh", _bytes[x:x + 8]) + for x in range(4, len(_bytes) - 8, 8) + ] if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF: logging.warning("Data sequence skipped, lost packet?") @@ -377,8 +379,8 @@ class MonsoonProxy(object): self.ser.flush() flushed = 0 while True: - ready_r, ready_w, ready_x = select.select( - [self.ser], [], [self.ser], 0) + ready_r, ready_w, ready_x = select.select([self.ser], [], + [self.ser], 0) if len(ready_x) > 0: logging.error("Exception from serial port.") return None @@ -420,9 +422,9 @@ class MonsoonData(object): self.offset = offset num_of_data_pt = len(self._data_points) if self.offset >= num_of_data_pt: - raise MonsoonError(("Offset number (%d) must be smaller than the " - "number of data points (%d).") % - (offset, num_of_data_pt)) + raise MonsoonError( + ("Offset number (%d) must be smaller than the " + "number of data points (%d).") % (offset, num_of_data_pt)) self.data_points = self._data_points[self.offset:] self.timestamps = self._timestamps[self.offset:] self.hz = hz @@ -468,17 +470,26 @@ class MonsoonData(object): lines = data_str.strip().split('\n') err_msg = ("Invalid input string format. Is this string generated by " "MonsoonData class?") - conditions = [len(lines) <= 4, "Average Current:" not in lines[1], - "Voltage: " not in lines[2], - "Total Power: " not in lines[3], - "samples taken at " not in lines[4], - lines[5] != "Time" + ' ' * 7 + "Amp"] + conditions = [ + len(lines) <= 4, "Average Current:" not in lines[1], + "Voltage: " not in lines[2], "Total Power: " not in lines[3], + "samples taken at " not in lines[4], + lines[5] != "Time" + ' ' * 7 + "Amp" + ] if any(conditions): raise MonsoonError(err_msg) - hz_str = lines[4].split()[2] - hz = int(hz_str[:-2]) + """Example string from Monsoon output file, first line is empty. + Line1: + Line2: test_2g_screenoff_dtimx2_marlin_OPD1.170706.006 + Line3: Average Current: 51.87984mA. + Line4: Voltage: 4.2V. + Line5: Total Power: 217.895328mW. + Line6: 150000 samples taken at 500Hz, with an offset of 0 samples. + """ + hz_str = lines[4].split()[4] + hz = int(hz_str[:-3]) voltage_str = lines[2].split()[1] - voltage = int(voltage[:-1]) + voltage = float(voltage_str[:-2]) lines = lines[6:] t = [] v = [] @@ -505,7 +516,7 @@ class MonsoonData(object): raise MonsoonError("Attempting to write empty Monsoon data to " "file, abort") utils.create_dir(os.path.dirname(file_path)) - with open(file_path, 'w') as f: + with open(file_path, 'a') as f: for md in monsoon_data: f.write(str(md)) f.write(MonsoonData.delimiter) @@ -525,6 +536,7 @@ class MonsoonData(object): results = [] with open(file_path, 'r') as f: data_strs = f.read().split(MonsoonData.delimiter) + data_strs = data_strs[:-1] for data_str in data_strs: results.append(MonsoonData.from_string(data_str)) return results @@ -590,9 +602,9 @@ class MonsoonData(object): strs.append("Average Current: {}mA.".format(self.average_current)) strs.append("Voltage: {}V.".format(self.voltage)) strs.append("Total Power: {}mW.".format(self.total_power)) - strs.append(("{} samples taken at {}Hz, with an offset of {} samples." - ).format( - len(self._data_points), self.hz, self.offset)) + strs.append( + ("{} samples taken at {}Hz, with an offset of {} samples.").format( + len(self._data_points), self.hz, self.offset)) return "\n".join(strs) def __len__(self): @@ -750,11 +762,12 @@ class Monsoon(object): pass self.mon.StopDataCollection() try: - return MonsoonData(current_values, - timestamps, - sample_hz, - voltage, - offset=sample_offset) + return MonsoonData( + current_values, + timestamps, + sample_hz, + voltage, + offset=sample_offset) except: return None @@ -883,8 +896,8 @@ class Monsoon(object): self._wait_for_device(self.dut) # Wait for device to come back online. time.sleep(10) - self.dut.start_services(skip_sl4a=getattr(self.dut, - "skip_sl4a", False)) + self.dut.start_services(skip_sl4a=getattr( + self.dut, "skip_sl4a", False)) # Release wake lock to put device into sleep. self.dut.droid.goToSleepNow() return results @@ -909,14 +922,13 @@ class Monsoon(object): oset = offset * hz data = None try: - self.usb("auto") - time.sleep(1) self.dut.stop_services() time.sleep(1) + self.usb("off") data = self.take_samples(hz, num, sample_offset=oset) if not data: - raise MonsoonError(( - "No data was collected in measurement %s.") % tag) + raise MonsoonError( + ("No data was collected in measurement %s.") % tag) data.tag = tag self.log.info("Measurement summary: %s", repr(data)) finally: @@ -926,8 +938,8 @@ class Monsoon(object): self._wait_for_device(self.dut) # Wait for device to come back online. time.sleep(10) - self.dut.start_services(skip_sl4a=getattr(self.dut, - "skip_sl4a", False)) + self.dut.start_services(skip_sl4a=getattr(self.dut, "skip_sl4a", + False)) # Release wake lock to put device into sleep. self.dut.droid.goToSleepNow() self.log.info("Dut reconnected.") diff --git a/acts/framework/acts/controllers/packet_sender.py b/acts/framework/acts/controllers/packet_sender.py new file mode 100644 index 0000000000..6b3898a0f6 --- /dev/null +++ b/acts/framework/acts/controllers/packet_sender.py @@ -0,0 +1,780 @@ +#./usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +"""Collection of utility functions to generate and send custom packets. + +""" +import logging +import multiprocessing +import socket +import time + +import acts.signals +from acts.test_utils.wifi import wifi_power_test_utils as wputils +# http://www.secdev.org/projects/scapy/ +# On ubuntu, sudo pip3 install scapy-python3 +import scapy.all as scapy + +ACTS_CONTROLLER_CONFIG_NAME = 'PacketSender' +ACTS_CONTROLLER_REFERENCE_NAME = 'packet_senders' + +GET_FROM_LOCAL_INTERFACE = 'get_local' +MAC_BROADCAST = 'ff:ff:ff:ff:ff:ff' +IPV4_BROADCAST = '255.255.255.255' +ARP_DST = '00:00:00:00:00:00' +RA_MAC = '33:33:00:00:00:01' +RA_IP = 'ff02::1' +RA_PREFIX = 'd00d::' +RA_PREFIX_LEN = 64 +DHCP_OFFER_OP = 2 +DHCP_OFFER_SRC_PORT = 67 +DHCP_OFFER_DST_PORT = 68 +DHCP_TRANS_ID = 0x01020304 +DNS_LEN = 3 +PING6_DATA = 'BEST PING6 EVER' +PING4_TYPE = 8 +MDNS_TTL = 255 +MDNS_QTYPE = 'PTR' +MDNS_UDP_PORT = 5353 +MDNS_V4_IP_DST = '224.0.0.251' +MDNS_V4_MAC_DST = '01:00:5E:00:00:FB' +MDNS_RECURSIVE = 1 +MDNS_V6_IP_DST = 'FF02::FB' +MDNS_V6_MAC_DST = '33:33:00:00:00:FB' + + +def create(configs): + """Creates PacketSender controllers from a json config. + + Args: + The json configs that represent this controller + + Returns: + A new PacketSender + """ + return [PacketSender(c) for c in configs] + + +def destroy(objs): + """Destroys a list of PacketSenders and stops sending (if active). + + Args: + objs: A list of PacketSenders + """ + for pkt_sender in objs: + pkt_sender.stop_sending(True) + return + + +def get_info(objs): + """Get information on a list of packet senders. + + Args: + objs: A list of PacketSenders + + Returns: + Network interface name that is being used by each packet sender + """ + return [pkt_sender.interface for pkt_sender in objs] + + +class ThreadSendPacket(multiprocessing.Process): + """Creates a thread that keeps sending the same packet until a stop signal. + + Attributes: + stop_signal: signal to stop the thread execution + packet: desired packet to keep sending + interval: interval between consecutive packets (s) + interface: network interface name (e.g., 'eth0') + log: object used for logging + """ + + def __init__(self, signal, packet, interval, interface, log): + multiprocessing.Process.__init__(self) + self.stop_signal = signal + self.packet = packet + self.interval = interval + self.interface = interface + self.log = log + + def run(self): + self.log.info('Packet Sending Started.') + while True: + if self.stop_signal.is_set(): + # Poison pill means shutdown + self.log.info('Packet Sending Stopped.') + break + + try: + scapy.sendp(self.packet, iface=self.interface, verbose=0) + time.sleep(self.interval) + except Exception: + self.log.exception('Exception when trying to send packet') + return + + return + + +class PacketSenderError(acts.signals.ControllerError): + """Raises exceptions encountered in packet sender lib.""" + + +class PacketSender(object): + """Send any custom packet over a desired interface. + + Attributes: + log: class logging object + thread_active: indicates whether or not the send thread is active + thread_send: thread object for the concurrent packet transmissions + stop_signal: event to stop the thread + interface: network interface name (e.g., 'eth0') + """ + + def __init__(self, ifname): + """Initiallize the PacketGenerator class. + + Args: + ifname: network interface name that will be used packet generator + """ + self.log = logging.getLogger() + self.packet = None + self.thread_active = False + self.thread_send = None + self.stop_signal = multiprocessing.Event() + self.interface = ifname + + def send_ntimes(self, packet, ntimes, interval): + """Sends a packet ntimes at a given interval. + + Args: + packet: custom built packet from Layer 2 up to Application layer + ntimes: number of packets to send + interval: interval between consecutive packet transmissions (s) + """ + if packet is None: + raise PacketSenderError( + 'There is no packet to send. Create a packet first.') + + for _ in range(ntimes): + try: + scapy.sendp(packet, iface=self.interface, verbose=0) + time.sleep(interval) + except socket.error as excpt: + self.log.exception('Caught socket exception : %s' % excpt) + return + + def send_receive_ntimes(self, packet, ntimes, interval): + """Sends a packet and receives the reply ntimes at a given interval. + + Args: + packet: custom built packet from Layer 2 up to Application layer + ntimes: number of packets to send + interval: interval between consecutive packet transmissions and + the corresponding reply (s) + """ + if packet is None: + raise PacketSenderError( + 'There is no packet to send. Create a packet first.') + + for _ in range(ntimes): + try: + scapy.srp1( + packet, iface=self.interface, timeout=interval, verbose=0) + time.sleep(interval) + except socket.error as excpt: + self.log.exception('Caught socket exception : %s' % excpt) + return + + def start_sending(self, packet, interval): + """Sends packets in parallel with the main process. + + Creates a thread and keeps sending the same packet at a given interval + until a stop signal is received + + Args: + packet: custom built packet from Layer 2 up to Application layer + interval: interval between consecutive packets (s) + """ + if packet is None: + raise PacketSenderError( + 'There is no packet to send. Create a packet first.') + + if self.thread_active: + raise PacketSenderError( + ('There is already an active thread. Stop it' + 'before starting another transmission.')) + + self.thread_send = ThreadSendPacket(self.stop_signal, packet, interval, + self.interface, self.log) + self.thread_send.start() + self.thread_active = True + + def stop_sending(self, ignore_status=False): + """Stops the concurrent thread that is continuously sending packets. + + """ + if not self.thread_active: + if ignore_status: + return + else: + raise PacketSenderError( + 'Error: There is no acive thread running to stop.') + + # Stop thread + self.stop_signal.set() + self.thread_send.join() + + # Just as precaution + if self.thread_send.is_alive(): + self.thread_send.terminate() + self.log.warning('Packet Sending forced to terminate') + + self.stop_signal.clear() + self.thread_send = None + self.thread_active = False + + +class ArpGenerator(object): + """Creates a custom ARP packet + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + src_ipv4: IPv4 address (Layer 3) of the source node + dst_ipv4: IPv4 address (Layer 3) of the destination node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: a dictionary with all the necessary packet fields. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.dst_ipv4 = config_params['dst_ipv4'] + if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv4 = scapy.get_if_addr(interf) + else: + self.src_ipv4 = config_params['src_ipv4'] + + def generate(self, ip_dst=None, hwsrc=None, hwdst=None, eth_dst=None): + """Generates a custom ARP packet. + + Args: + ip_dst: ARP ipv4 destination (Optional) + hwsrc: ARP hardware source address (Optional) + hwdst: ARP hardware destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + # Create IP layer + hw_src = (hwsrc if hwsrc is not None else self.src_mac) + hw_dst = (hwdst if hwdst is not None else ARP_DST) + ipv4_dst = (ip_dst if ip_dst is not None else self.dst_ipv4) + ip4 = scapy.ARP( + pdst=ipv4_dst, psrc=self.src_ipv4, hwdst=hw_dst, hwsrc=hw_src) + + # Create Ethernet layer + mac_dst = (eth_dst if eth_dst is not None else MAC_BROADCAST) + ethernet = scapy.Ether(src=self.src_mac, dst=mac_dst) + + self.packet = ethernet / ip4 + return self.packet + + +class DhcpOfferGenerator(object): + """Creates a custom DHCP offer packet + + Attributes: + packet: desired built custom packet + subnet_mask: local network subnet mask + src_mac: MAC address (Layer 2) of the source node + dst_mac: MAC address (Layer 2) of the destination node + src_ipv4: IPv4 address (Layer 3) of the source node + dst_ipv4: IPv4 address (Layer 3) of the destination node + gw_ipv4: IPv4 address (Layer 3) of the Gateway + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + self.subnet_mask = config_params['subnet_mask'] + self.dst_mac = config_params['dst_mac'] + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.dst_ipv4 = config_params['dst_ipv4'] + if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv4 = scapy.get_if_addr(interf) + else: + self.src_ipv4 = config_params['src_ipv4'] + + self.gw_ipv4 = config_params['gw_ipv4'] + + def generate(self, cha_mac=None, dst_ip=None): + """Generates a DHCP offer packet. + + Args: + cha_mac: hardware target address for DHCP offer (Optional) + dst_ip: ipv4 address of target host for renewal (Optional) + """ + + # Create DHCP layer + dhcp = scapy.DHCP(options=[ + ('message-type', 'offer'), + ('subnet_mask', self.subnet_mask), + ('server_id', self.src_ipv4), + ('end'), + ]) + + # Overwrite standard DHCP fields + sta_hw = (cha_mac if cha_mac is not None else self.dst_mac) + sta_ip = (dst_ip if dst_ip is not None else self.dst_ipv4) + + # Create Boot + bootp = scapy.BOOTP( + op=DHCP_OFFER_OP, + yiaddr=sta_ip, + siaddr=self.src_ipv4, + giaddr=self.gw_ipv4, + chaddr=scapy.mac2str(sta_hw), + xid=DHCP_TRANS_ID) + + # Create UDP + udp = scapy.UDP(sport=DHCP_OFFER_SRC_PORT, dport=DHCP_OFFER_DST_PORT) + + # Create IP layer + ip4 = scapy.IP(src=self.src_ipv4, dst=IPV4_BROADCAST) + + # Create Ethernet layer + ethernet = scapy.Ether(dst=MAC_BROADCAST, src=self.src_mac) + + self.packet = ethernet / ip4 / udp / bootp / dhcp + return self.packet + + +class NsGenerator(object): + """Creates a custom Neighbor Solicitation (NS) packet + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc) + src_ipv6: IPv6 address (Layer 3) of the source node + dst_ipv6: IPv6 address (Layer 3) of the destination node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.dst_ipv6 = config_params['dst_ipv6'] + self.src_ipv6_type = config_params['src_ipv6_type'] + if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type) + else: + self.src_ipv6 = config_params['src_ipv6'] + + def generate(self, ip_dst=None, eth_dst=None): + """Generates a Neighbor Solicitation (NS) packet (ICMP over IPv6). + + Args: + ip_dst: NS ipv6 destination (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + # Compute IP addresses + target_ip6 = ip_dst if ip_dst is not None else self.dst_ipv6 + ndst_ip = socket.inet_pton(socket.AF_INET6, target_ip6) + nnode_mcast = scapy.in6_getnsma(ndst_ip) + node_mcast = socket.inet_ntop(socket.AF_INET6, nnode_mcast) + # Compute MAC addresses + hw_dst = (eth_dst + if eth_dst is not None else scapy.in6_getnsmac(nnode_mcast)) + + # Create IPv6 layer + base = scapy.IPv6(dst=node_mcast, src=self.src_ipv6) + neighbor_solicitation = scapy.ICMPv6ND_NS(tgt=target_ip6) + src_ll_addr = scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.src_mac) + ip6 = base / neighbor_solicitation / src_ll_addr + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst) + + self.packet = ethernet / ip6 + return self.packet + + +class RaGenerator(object): + """Creates a custom Router Advertisement (RA) packet + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc) + src_ipv6: IPv6 address (Layer 3) of the source node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.src_ipv6_type = config_params['src_ipv6_type'] + if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type) + else: + self.src_ipv6 = config_params['src_ipv6'] + + def generate(self, + lifetime, + enableDNS=False, + dns_lifetime=0, + ip_dst=None, + eth_dst=None): + """Generates a Router Advertisement (RA) packet (ICMP over IPv6). + + Args: + lifetime: RA lifetime + enableDNS: Add RDNSS option to RA (Optional) + dns_lifetime: Set DNS server lifetime (Optional) + ip_dst: IPv6 destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + # Overwrite standard fields if desired + ip6_dst = (ip_dst if ip_dst is not None else RA_IP) + hw_dst = (eth_dst if eth_dst is not None else RA_MAC) + + # Create IPv6 layer + base = scapy.IPv6(dst=ip6_dst, src=self.src_ipv6) + router_solicitation = scapy.ICMPv6ND_RA(routerlifetime=lifetime) + src_ll_addr = scapy.ICMPv6NDOptSrcLLAddr(lladdr=self.src_mac) + prefix = scapy.ICMPv6NDOptPrefixInfo( + prefixlen=RA_PREFIX_LEN, prefix=RA_PREFIX) + if enableDNS: + rndss = scapy.ICMPv6NDOptRDNSS( + lifetime=dns_lifetime, dns=[self.src_ipv6], len=DNS_LEN) + ip6 = base / router_solicitation / src_ll_addr / prefix / rndss + else: + ip6 = base / router_solicitation / src_ll_addr / prefix + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst) + + self.packet = ethernet / ip6 + return self.packet + + +class Ping6Generator(object): + """Creates a custom Ping v6 packet (i.e., ICMP over IPv6) + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + dst_mac: MAC address (Layer 2) of the destination node + src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc) + src_ipv6: IPv6 address (Layer 3) of the source node + dst_ipv6: IPv6 address (Layer 3) of the destination node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + self.dst_mac = config_params['dst_mac'] + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.dst_ipv6 = config_params['dst_ipv6'] + self.src_ipv6_type = config_params['src_ipv6_type'] + if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type) + else: + self.src_ipv6 = config_params['src_ipv6'] + + def generate(self, ip_dst=None, eth_dst=None): + """Generates a Ping6 packet (i.e., Echo Request) + + Args: + ip_dst: IPv6 destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + # Overwrite standard fields if desired + ip6_dst = (ip_dst if ip_dst is not None else self.dst_ipv6) + hw_dst = (eth_dst if eth_dst is not None else self.dst_mac) + + # Create IPv6 layer + base = scapy.IPv6(dst=ip6_dst, src=self.src_ipv6) + echo_request = scapy.ICMPv6EchoRequest(data=PING6_DATA) + + ip6 = base / echo_request + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=hw_dst) + + self.packet = ethernet / ip6 + return self.packet + + +class Ping4Generator(object): + """Creates a custom Ping v4 packet (i.e., ICMP over IPv4) + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + dst_mac: MAC address (Layer 2) of the destination node + src_ipv4: IPv4 address (Layer 3) of the source node + dst_ipv4: IPv4 address (Layer 3) of the destination node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + self.dst_mac = config_params['dst_mac'] + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.dst_ipv4 = config_params['dst_ipv4'] + if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv4 = scapy.get_if_addr(interf) + else: + self.src_ipv4 = config_params['src_ipv4'] + + def generate(self, ip_dst=None, eth_dst=None): + """Generates a Ping4 packet (i.e., Echo Request) + + Args: + ip_dst: IP destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + + # Overwrite standard fields if desired + sta_ip = (ip_dst if ip_dst is not None else self.dst_ipv4) + sta_hw = (eth_dst if eth_dst is not None else self.dst_mac) + + # Create IPv6 layer + base = scapy.IP(src=self.src_ipv4, dst=sta_ip) + echo_request = scapy.ICMP(type=PING4_TYPE) + + ip4 = base / echo_request + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw) + + self.packet = ethernet / ip4 + return self.packet + + +class Mdns6Generator(object): + """Creates a custom mDNS IPv6 packet + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + src_ipv6_type: IPv6 source address type (e.g., Link Local, Global, etc) + src_ipv6: IPv6 address (Layer 3) of the source node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + self.src_ipv6_type = config_params['src_ipv6_type'] + if config_params['src_ipv6'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv6 = wputils.get_if_addr6(interf, self.src_ipv6_type) + else: + self.src_ipv6 = config_params['src_ipv6'] + + def generate(self, ip_dst=None, eth_dst=None): + """Generates a mDNS v6 packet for multicast DNS config + + Args: + ip_dst: IPv6 destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + + # Overwrite standard fields if desired + sta_ip = (ip_dst if ip_dst is not None else MDNS_V6_IP_DST) + sta_hw = (eth_dst if eth_dst is not None else MDNS_V6_MAC_DST) + + # Create mDNS layer + qdServer = scapy.DNSQR(qname=self.src_ipv6, qtype=MDNS_QTYPE) + mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer) + + # Create UDP + udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT) + + # Create IP layer + ip6 = scapy.IPv6(src=self.src_ipv6, dst=sta_ip) + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw) + + self.packet = ethernet / ip6 / udp / mDNS + return self.packet + + +class Mdns4Generator(object): + """Creates a custom mDNS v4 packet + + Attributes: + packet: desired built custom packet + src_mac: MAC address (Layer 2) of the source node + src_ipv4: IPv4 address (Layer 3) of the source node + """ + + def __init__(self, **config_params): + """Initialize the class with the required network and packet params. + + Args: + config_params: contains all the necessary packet parameters. + Some fields can be generated automatically. For example: + {'subnet_mask': '255.255.255.0', + 'dst_ipv4': '192.168.1.3', + 'src_ipv4: 'get_local', ... + The key can also be 'get_local' which means the code will read + and use the local interface parameters + """ + interf = config_params['interf'] + self.packet = None + if config_params['src_mac'] == GET_FROM_LOCAL_INTERFACE: + self.src_mac = scapy.get_if_hwaddr(interf) + else: + self.src_mac = config_params['src_mac'] + + if config_params['src_ipv4'] == GET_FROM_LOCAL_INTERFACE: + self.src_ipv4 = scapy.get_if_addr(interf) + else: + self.src_ipv4 = config_params['src_ipv4'] + + def generate(self, ip_dst=None, eth_dst=None): + """Generates a mDNS v4 packet for multicast DNS config + + Args: + ip_dst: IP destination address (Optional) + eth_dst: Ethernet (layer 2) destination address (Optional) + """ + + # Overwrite standard fields if desired + sta_ip = (ip_dst if ip_dst is not None else MDNS_V4_IP_DST) + sta_hw = (eth_dst if eth_dst is not None else MDNS_V4_MAC_DST) + + # Create mDNS layer + qdServer = scapy.DNSQR(qname=self.src_ipv4, qtype=MDNS_QTYPE) + mDNS = scapy.DNS(rd=MDNS_RECURSIVE, qd=qdServer) + + # Create UDP + udp = scapy.UDP(sport=MDNS_UDP_PORT, dport=MDNS_UDP_PORT) + + # Create IP layer + ip4 = scapy.IP(src=self.src_ipv4, dst=sta_ip, ttl=255) + + # Create Ethernet layer + ethernet = scapy.Ether(src=self.src_mac, dst=sta_hw) + + self.packet = ethernet / ip4 / udp / mDNS + return self.packet diff --git a/acts/framework/acts/controllers/relay_device_controller.py b/acts/framework/acts/controllers/relay_device_controller.py index 36715a3257..41f4fe99c5 100644 --- a/acts/framework/acts/controllers/relay_device_controller.py +++ b/acts/framework/acts/controllers/relay_device_controller.py @@ -26,17 +26,47 @@ def create(config): """Creates RelayDevice controller objects. Args: - config: A dict of: - config_path: The path to the RelayDevice config file. - devices: A list of configs or names associated with the devices. + config: Either one of two types: + + A filename to a RelayController config (json file) + A RelayController config/dict composed of: + boards: A list of controller boards (see tests). + devices: A list of RelayDevices attached to the boards. Returns: A list of RelayDevice objects. """ + if type(config) is str: + return _create_from_external_config_file(config) + elif type(config) is dict: + return _create_from_dict(config) + + +def _create_from_external_config_file(config_filename): + """Creates RelayDevice controller objects from an external config file. + + Args: + config_filename: The filename of the RelayController config. + + Returns: + A list of RelayDevice objects. + """ + with open(config_filename) as json_file: + return _create_from_dict(json.load(json_file)) + + +def _create_from_dict(config): + """Creates RelayDevice controller objects from a dictionary. + + Args: + config: The dictionary containing the RelayController config. + + Returns: + A list of RelayDevice objects. + """ devices = list() - with open(config) as json_file: - relay_rig = RelayRig(json.load(json_file)) + relay_rig = RelayRig(config) for device in relay_rig.devices.values(): devices.append(device) diff --git a/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py b/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py new file mode 100644 index 0000000000..4d9455af80 --- /dev/null +++ b/acts/framework/acts/controllers/relay_lib/ak_xb10_speaker.py @@ -0,0 +1,76 @@ +#!/usr/bin/env python +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import enum +import logging +from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice +from acts.controllers.relay_lib.relay import SynchronizeRelays +from acts.controllers.relay_lib.errors import RelayConfigError +from acts.controllers.relay_lib.helpers import validate_key + +PAIRING_MODE_WAIT_TIME = 5 +POWER_ON_WAIT_TIME = 2 +POWER_OFF_WAIT_TIME = 6 +MISSING_RELAY_MSG = 'Relay config for Ak XB10 "%s" missing relay "%s".' + +log = logging + + +class Buttons(enum.Enum): + POWER = 'Power' + PAIR = 'Pair' + + +class AkXB10Speaker(GenericRelayDevice): + """A&K XB10 Bluetooth Speaker model + + Wraps the button presses, as well as the special features like pairing. + """ + + def __init__(self, config, relay_rig): + GenericRelayDevice.__init__(self, config, relay_rig) + + self.mac_address = validate_key('mac_address', config, str, 'ak_xb10') + + for button in Buttons: + self.ensure_config_contains_relay(button.value) + + def ensure_config_contains_relay(self, relay_name): + """Throws an error if the relay does not exist.""" + if relay_name not in self.relays: + raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name)) + + def _hold_button(self, button, seconds): + self.hold_down(button.value) + time.sleep(seconds) + self.release(button.value) + + def power_on(self): + self._hold_button(Buttons.POWER, POWER_ON_WAIT_TIME) + + def power_off(self): + self._hold_button(Buttons.POWER, POWER_OFF_WAIT_TIME) + + def enter_pairing_mode(self): + self._hold_button(Buttons.PAIR, PAIRING_MODE_WAIT_TIME) + + def setup(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.setup(self) + + def clean_up(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.clean_up(self) diff --git a/acts/framework/acts/controllers/relay_lib/dongles.py b/acts/framework/acts/controllers/relay_lib/dongles.py new file mode 100644 index 0000000000..518caadf13 --- /dev/null +++ b/acts/framework/acts/controllers/relay_lib/dongles.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import time +import enum +from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice +from acts.controllers.relay_lib.relay import SynchronizeRelays +from acts.controllers.relay_lib.errors import RelayConfigError +from acts.controllers.relay_lib.helpers import validate_key + +# Necessary timeout inbetween commands +CMD_TIMEOUT = 1.2 +# Pairing mode activation wait time +PAIRING_MODE_WAIT_TIME = 6 +SINGLE_ACTION_SHORT_WAIT_TIME = 0.6 +SINGLE_ACTION_LONG_WAIT_TIME = 2.0 +MISSING_RELAY_MSG = 'Relay config for Three button "%s" missing relay "%s".' + + +class Buttons(enum.Enum): + ACTION = 'Action' + NEXT = 'Next' + PREVIOUS = 'Previous' + + +class SingleButtonDongle(GenericRelayDevice): + """A Bluetooth dongle with one generic button Normally action. + + Wraps the button presses, as well as the special features like pairing. + """ + + def __init__(self, config, relay_rig): + GenericRelayDevice.__init__(self, config, relay_rig) + + self.mac_address = validate_key('mac_address', config, str, + 'SingleButtonDongle') + + self.ensure_config_contains_relay(Buttons.ACTION.value) + + def ensure_config_contains_relay(self, relay_name): + """Throws an error if the relay does not exist.""" + if relay_name not in self.relays: + raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name)) + + def setup(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.setup(self) + + def clean_up(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.clean_up(self) + + def enter_pairing_mode(self): + """Enters pairing mode. Blocks the thread until pairing mode is set. + + Holds down the 'ACTION' buttons for PAIRING_MODE_WAIT_TIME seconds. + """ + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=PAIRING_MODE_WAIT_TIME) + + def press_play_pause(self): + """Briefly presses the Action button.""" + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=SINGLE_ACTION_SHORT_WAIT_TIME) + + def press_vr_mode(self): + """Long press the Action button.""" + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=SINGLE_ACTION_LONG_WAIT_TIME) + + +class ThreeButtonDongle(GenericRelayDevice): + """A Bluetooth dongle with three generic buttons Normally action, next, and + previous. + + Wraps the button presses, as well as the special features like pairing. + """ + + def __init__(self, config, relay_rig): + GenericRelayDevice.__init__(self, config, relay_rig) + + self.mac_address = validate_key('mac_address', config, str, + 'ThreeButtonDongle') + + for button in Buttons: + self.ensure_config_contains_relay(button.value) + + def ensure_config_contains_relay(self, relay_name): + """Throws an error if the relay does not exist.""" + if relay_name not in self.relays: + raise RelayConfigError(MISSING_RELAY_MSG % (self.name, relay_name)) + + def setup(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.setup(self) + + def clean_up(self): + """Sets all relays to their default state (off).""" + GenericRelayDevice.clean_up(self) + + def enter_pairing_mode(self): + """Enters pairing mode. Blocks the thread until pairing mode is set. + + Holds down the 'ACTION' buttons for a little over 5 seconds. + """ + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=PAIRING_MODE_WAIT_TIME) + + def press_play_pause(self): + """Briefly presses the Action button.""" + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=SINGLE_ACTION_SHORT_WAIT_TIME) + time.sleep(CMD_TIMEOUT) + + def press_vr_mode(self): + """Long press the Action button.""" + self.relays[Buttons.ACTION.value].set_nc_for( + seconds=SINGLE_ACTION_LONG_WAIT_TIME) + time.sleep(CMD_TIMEOUT) + + def press_next(self): + """Briefly presses the Next button.""" + self.relays[Buttons.NEXT.value].set_nc_for( + seconds=SINGLE_ACTION_SHORT_WAIT_TIME) + time.sleep(CMD_TIMEOUT) + + def press_previous(self): + """Briefly presses the Previous button.""" + self.relays[Buttons.PREVIOUS.value].set_nc_for( + seconds=SINGLE_ACTION_SHORT_WAIT_TIME) + time.sleep(CMD_TIMEOUT) diff --git a/acts/framework/acts/controllers/relay_lib/generic_relay_device.py b/acts/framework/acts/controllers/relay_lib/generic_relay_device.py index e13604638b..8547b5c0d9 100644 --- a/acts/framework/acts/controllers/relay_lib/generic_relay_device.py +++ b/acts/framework/acts/controllers/relay_lib/generic_relay_device.py @@ -39,7 +39,8 @@ class GenericRelayDevice(RelayDevice): """Sets all relays to their default state (off).""" with SynchronizeRelays(): for relay in self.relays.values(): - relay.set_no() + if relay.is_dirty(): + relay.set_no() def press(self, button_name): """Presses the button with name 'button_name'.""" diff --git a/acts/framework/acts/controllers/relay_lib/relay.py b/acts/framework/acts/controllers/relay_lib/relay.py index 8da32ebdcf..80ea35a0f1 100644 --- a/acts/framework/acts/controllers/relay_lib/relay.py +++ b/acts/framework/acts/controllers/relay_lib/relay.py @@ -76,7 +76,7 @@ class Relay(object): def __init__(self, relay_board, position): self.relay_board = relay_board self.position = position - self._original_state = relay_board.get_relay_status(self.position) + self._original_state = None self.relay_id = "%s/%s" % (self.relay_board.name, self.position) def set_no(self): @@ -115,6 +115,10 @@ class Relay(object): ValueError if state is not 'NO' or 'NC'. """ + if self._original_state is None: + self._original_state = self.relay_board.get_relay_status( + self.position) + if state is not RelayState.NO and state is not RelayState.NC: raise ValueError( 'Invalid state. Received "%s". Expected any of %s.' % @@ -158,8 +162,11 @@ class Relay(object): sure to make the necessary modifications in RelayRig.initialize_relay and RelayRigParser.parse_json_relays. """ + if self._original_state is not None: + self.set(self._original_state) - self.set(self._original_state) + def is_dirty(self): + return self._original_state is not None class RelayDict(object): diff --git a/acts/framework/acts/controllers/relay_lib/relay_rig.py b/acts/framework/acts/controllers/relay_lib/relay_rig.py index a88099d566..3f4ca051e8 100644 --- a/acts/framework/acts/controllers/relay_lib/relay_rig.py +++ b/acts/framework/acts/controllers/relay_lib/relay_rig.py @@ -19,6 +19,9 @@ from acts.controllers.relay_lib.sain_smart_board import SainSmartBoard from acts.controllers.relay_lib.generic_relay_device import GenericRelayDevice from acts.controllers.relay_lib.fugu_remote import FuguRemote from acts.controllers.relay_lib.sony_xb2_speaker import SonyXB2Speaker +from acts.controllers.relay_lib.ak_xb10_speaker import AkXB10Speaker +from acts.controllers.relay_lib.dongles import SingleButtonDongle +from acts.controllers.relay_lib.dongles import ThreeButtonDongle class RelayRig: @@ -50,6 +53,9 @@ class RelayRig: 'GenericRelayDevice': lambda x, rig: GenericRelayDevice(x, rig), 'FuguRemote': lambda x, rig: FuguRemote(x, rig), 'SonyXB2Speaker': lambda x, rig: SonyXB2Speaker(x, rig), + 'AkXB10Speaker': lambda x, rig: AkXB10Speaker(x, rig), + 'SingleButtonDongle': lambda x, rig: SingleButtonDongle(x, rig), + 'ThreeButtonDongle': lambda x, rig: ThreeButtonDongle(x, rig), } def __init__(self, config): diff --git a/acts/framework/acts/controllers/relay_lib/sain_smart_board.py b/acts/framework/acts/controllers/relay_lib/sain_smart_board.py index 0b273a6290..2d27816e4e 100644 --- a/acts/framework/acts/controllers/relay_lib/sain_smart_board.py +++ b/acts/framework/acts/controllers/relay_lib/sain_smart_board.py @@ -59,7 +59,7 @@ class SainSmartBoard(RelayBoard): self.base_url = validate_key('base_url', config, str, 'config') if not self.base_url.endswith('/'): self.base_url += '/' - RelayBoard.__init__(self, config) + super(SainSmartBoard, self).__init__(config) def get_relay_position_list(self): return self.VALID_RELAY_POSITIONS diff --git a/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py b/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py index 2d01eca384..3ee95cf49b 100644 --- a/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py +++ b/acts/framework/acts/controllers/relay_lib/sony_xb2_speaker.py @@ -24,7 +24,7 @@ from acts.controllers.relay_lib.helpers import validate_key PAIRING_MODE_WAIT_TIME = 5 POWER_ON_WAIT_TIME = 2 POWER_OFF_WAIT_TIME = 6 -MISSING_RELAY_MSG = 'Relay config for Sonxy XB2 "%s" missing relay "%s".' +MISSING_RELAY_MSG = 'Relay config for Sony XB2 "%s" missing relay "%s".' log = logging @@ -35,7 +35,7 @@ class Buttons(enum.Enum): class SonyXB2Speaker(GenericRelayDevice): - """A Sony XB2 Bluetooth Speaker. + """Sony XB2 Bluetooth Speaker model Wraps the button presses, as well as the special features like pairing. """ diff --git a/acts/framework/acts/keys.py b/acts/framework/acts/keys.py index be5747029e..77658745ae 100644 --- a/acts/framework/acts/keys.py +++ b/acts/framework/acts/keys.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3.4 # -# Copyright 2016 - The Android Open Source Project +# Copyright 2017 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -42,6 +42,7 @@ class Config(enum.Enum): key_access_point = "AccessPoint" key_attenuator = "Attenuator" key_iperf_server = "IPerfServer" + key_packet_sender = "PacketSender" key_monsoon = "Monsoon" key_sniffer = "Sniffer" # Internal keys, used internally, not exposed to user's config files. @@ -58,6 +59,7 @@ class Config(enum.Enum): m_key_access_point = "access_point" m_key_attenuator = "attenuator" m_key_iperf_server = "iperf_server" + m_key_packet_sender = "packet_sender" m_key_sniffer = "sniffer" # A list of keys whose values in configs should not be passed to test @@ -67,13 +69,10 @@ class Config(enum.Enum): # Controller names packaged with ACTS. builtin_controller_names = [ key_android_device, key_native_android_device, key_relay_device, - key_access_point, key_attenuator, key_iperf_server, key_monsoon, - key_sniffer + key_access_point, key_attenuator, key_iperf_server, key_packet_sender, + key_monsoon, key_sniffer ] - # Keys that are file or folder paths. - file_path_keys = [key_relay_device] - def get_name_by_value(value): for name, member in Config.__members__.items(): diff --git a/acts/framework/acts/libs/ota/__init__.py b/acts/framework/acts/libs/ota/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/acts/framework/acts/libs/ota/__init__.py diff --git a/acts/framework/acts/libs/ota/ota_runners/__init__.py b/acts/framework/acts/libs/ota/ota_runners/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_runners/__init__.py diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py new file mode 100644 index 0000000000..776c9008a1 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner.py @@ -0,0 +1,127 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import time + +SL4A_SERVICE_SETUP_TIME = 5 + + +class OtaError(Exception): + """Raised when an error in the OTA Update process occurs.""" + + +class OtaRunner(object): + """The base class for all OTA Update Runners.""" + + def __init__(self, ota_tool, android_device): + self.ota_tool = ota_tool + self.android_device = android_device + self.serial = self.android_device.serial + + def _update(self): + logging.info('Stopping services.') + self.android_device.stop_services() + logging.info('Beginning tool.') + self.ota_tool.update(self) + logging.info('Tool finished. Waiting for boot completion.') + self.android_device.wait_for_boot_completion() + logging.info('Boot completed. Rooting adb.') + self.android_device.root_adb() + logging.info('Root complete. Installing new SL4A.') + output = self.android_device.adb.install('-r %s' % self.get_sl4a_apk()) + logging.info('SL4A install output: %s' % output) + time.sleep(SL4A_SERVICE_SETUP_TIME) + logging.info('Starting services.') + self.android_device.start_services() + logging.info('Services started. Running ota tool cleanup.') + self.ota_tool.cleanup(self) + logging.info('Cleanup complete.') + + def can_update(self): + """Whether or not an update package is available for the device.""" + return NotImplementedError() + + def get_ota_package(self): + raise NotImplementedError() + + def get_sl4a_apk(self): + raise NotImplementedError() + + +class SingleUseOtaRunner(OtaRunner): + """A single use OtaRunner. + + SingleUseOtaRunners can only be ran once. If a user attempts to run it more + than once, an error will be thrown. Users can avoid the error by checking + can_update() before calling update(). + """ + + def __init__(self, ota_tool, android_device, ota_package, sl4a_apk): + super(SingleUseOtaRunner, self).__init__(ota_tool, android_device) + self._ota_package = ota_package + self._sl4a_apk = sl4a_apk + self._called = False + + def can_update(self): + return not self._called + + def update(self): + """Starts the update process.""" + if not self.can_update(): + raise OtaError('A SingleUseOtaTool instance cannot update a phone ' + 'multiple times.') + self._called = True + self._update() + + def get_ota_package(self): + return self._ota_package + + def get_sl4a_apk(self): + return self._sl4a_apk + + +class MultiUseOtaRunner(OtaRunner): + """A multiple use OtaRunner. + + MultiUseOtaRunner can only be ran for as many times as there have been + packages provided to them. If a user attempts to run it more than the number + of provided packages, an error will be thrown. Users can avoid the error by + checking can_update() before calling update(). + """ + + def __init__(self, ota_tool, android_device, ota_packages, sl4a_apks): + super(MultiUseOtaRunner, self).__init__(ota_tool, android_device) + self._ota_packages = ota_packages + self._sl4a_apks = sl4a_apks + self.current_update_number = 0 + + def can_update(self): + return not self.current_update_number == len(self._ota_packages) + + def update(self): + """Starts the update process.""" + if not self.can_update(): + raise OtaError('This MultiUseOtaRunner has already updated all ' + 'given packages onto the phone.') + self._update() + self.current_update_number += 1 + + def get_ota_package(self): + return self._ota_packages[self.current_update_number] + + def get_sl4a_apk(self): + return self._sl4a_apks[self.current_update_number] diff --git a/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py new file mode 100644 index 0000000000..fa6ab19790 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_runners/ota_runner_factory.py @@ -0,0 +1,204 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from acts.config_parser import ActsConfigError +from acts.libs.ota.ota_runners import ota_runner +from acts.libs.ota.ota_tools import ota_tool_factory +from acts.libs.ota.ota_tools import adb_sideload_ota_tool + +_bound_devices = {} + +DEFAULT_OTA_TOOL = adb_sideload_ota_tool.AdbSideloadOtaTool.__name__ +DEFAULT_OTA_COMMAND = 'adb' + + +def create_all_from_configs(config, android_devices): + """Creates a new OtaTool for each given AndroidDevice. + + After an OtaTool is assigned to a device, another OtaTool cannot be created + for that device. This will prevent OTA Update tests that accidentally flash + the same build onto a device more than once. + + Args: + config: the ACTS config user_params. + android_devices: The devices to run an OTA Update on. + + Returns: + A list of OtaRunners responsible for updating the given devices. The + indexes match the indexes of the corresponding AndroidDevice in + android_devices. + """ + return [create_from_configs(config, ad) for ad in android_devices] + + +def create_from_configs(config, android_device): + """Creates a new OtaTool for the given AndroidDevice. + + After an OtaTool is assigned to a device, another OtaTool cannot be created + for that device. This will prevent OTA Update tests that accidentally flash + the same build onto a device more than once. + + Args: + config: the ACTS config user_params. + android_device: The device to run the OTA Update on. + + Returns: + An OtaRunner responsible for updating the given device. + """ + # Default to adb sideload + try: + ota_tool_class_name = get_ota_value_from_config( + config, 'ota_tool', android_device) + except ActsConfigError: + ota_tool_class_name = DEFAULT_OTA_TOOL + + if ota_tool_class_name not in config: + if ota_tool_class_name is not DEFAULT_OTA_TOOL: + raise ActsConfigError( + 'If the ota_tool is overloaded, the path to the tool must be ' + 'added to the ACTS config file under {"OtaToolName": ' + '"path/to/tool"} (in this case, {"%s": "path/to/tool"}.' % + ota_tool_class_name) + else: + command = DEFAULT_OTA_COMMAND + else: + command = config[ota_tool_class_name] + if type(command) is list: + # If file came as a list in the config. + if len(command) == 1: + command = command[0] + else: + raise ActsConfigError( + 'Config value for "%s" must be either a string or a list ' + 'of exactly one element' % ota_tool_class_name) + + ota_package = get_ota_value_from_config(config, 'ota_package', + android_device) + ota_sl4a = get_ota_value_from_config(config, 'ota_sl4a', android_device) + if type(ota_sl4a) != type(ota_package): + raise ActsConfigError( + 'The ota_package and ota_sl4a must either both be strings, or ' + 'both be lists. Device with serial "%s" has mismatched types.' % + android_device.serial) + return create(ota_package, ota_sl4a, android_device, ota_tool_class_name, + command) + + +def create(ota_package, + ota_sl4a, + android_device, + ota_tool_class_name=DEFAULT_OTA_TOOL, + command=DEFAULT_OTA_COMMAND, + use_cached_runners=True): + """ + Args: + ota_package: A string or list of strings corresponding to the + update.zip package location(s) for running an OTA update. + ota_sl4a: A string or list of strings corresponding to the + sl4a.apk package location(s) for running an OTA update. + ota_tool_class_name: The class name for the desired ota_tool + command: The command line tool name for the updater + android_device: The AndroidDevice to run the OTA Update on. + use_cached_runners: Whether or not to use runners cached by previous + create calls. + + Returns: + An OtaRunner with the given properties from the arguments. + """ + ota_tool = ota_tool_factory.create(ota_tool_class_name, command) + return create_from_package(ota_package, ota_sl4a, android_device, ota_tool, + use_cached_runners) + + +def create_from_package(ota_package, + ota_sl4a, + android_device, + ota_tool, + use_cached_runners=True): + """ + Args: + ota_package: A string or list of strings corresponding to the + update.zip package location(s) for running an OTA update. + ota_sl4a: A string or list of strings corresponding to the + sl4a.apk package location(s) for running an OTA update. + ota_tool: The OtaTool to be paired with the returned OtaRunner + android_device: The AndroidDevice to run the OTA Update on. + use_cached_runners: Whether or not to use runners cached by previous + create calls. + + Returns: + An OtaRunner with the given properties from the arguments. + """ + if android_device in _bound_devices and use_cached_runners: + logging.warning('Android device %s has already been assigned an ' + 'OtaRunner. Returning previously created runner.') + return _bound_devices[android_device] + + if type(ota_package) != type(ota_sl4a): + raise TypeError( + 'The ota_package and ota_sl4a must either both be strings, or ' + 'both be lists. Device with serial "%s" has requested mismatched ' + 'types.' % android_device.serial) + + if type(ota_package) is str: + runner = ota_runner.SingleUseOtaRunner(ota_tool, android_device, + ota_package, ota_sl4a) + elif type(ota_package) is list: + runner = ota_runner.MultiUseOtaRunner(ota_tool, android_device, + ota_package, ota_sl4a) + else: + raise TypeError('The "ota_package" value in the acts config must be ' + 'either a list or a string.') + + _bound_devices[android_device] = runner + return runner + + +def get_ota_value_from_config(config, key, android_device): + """Returns a key for the given AndroidDevice. + + Args: + config: The ACTS config + key: The base key desired (ota_tool, ota_sl4a, or ota_package) + android_device: An AndroidDevice + + Returns: The value at the specified key. + Throws: ActsConfigError if the value cannot be determined from the config. + """ + suffix = '' + if 'ota_map' in config: + if android_device.serial in config['ota_map']: + suffix = '_%s' % config['ota_map'][android_device.serial] + + ota_package_key = '%s%s' % (key, suffix) + if ota_package_key not in config: + if suffix is not '': + raise ActsConfigError( + 'Asked for an OTA Update without specifying a required value. ' + '"ota_map" has entry {"%s": "%s"}, but there is no ' + 'corresponding entry {"%s":"/path/to/file"} found within the ' + 'ACTS config.' % (android_device.serial, suffix[1:], + ota_package_key)) + else: + raise ActsConfigError( + 'Asked for an OTA Update without specifying a required value. ' + '"ota_map" does not exist or have a key for serial "%s", and ' + 'the default value entry "%s" cannot be found within the ACTS ' + 'config.' % (android_device.serial, ota_package_key)) + + return config[ota_package_key] diff --git a/acts/framework/acts/libs/ota/ota_tools/__init__.py b/acts/framework/acts/libs/ota/ota_tools/__init__.py new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_tools/__init__.py diff --git a/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py new file mode 100644 index 0000000000..f94a7627fe --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_tools/adb_sideload_ota_tool.py @@ -0,0 +1,49 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from acts.libs.ota.ota_tools.ota_tool import OtaTool + +# OTA Packages can be upwards of 1 GB. This may take some time to transfer over +# USB 2.0. +PUSH_TIMEOUT = 10 * 60 + + +class AdbSideloadOtaTool(OtaTool): + """Updates an AndroidDevice using adb sideload.""" + + def __init__(self, ignored_command): + # "command" is ignored. The ACTS adb version is used to prevent + # differing adb versions from constantly killing adbd. + super(AdbSideloadOtaTool, self).__init__(ignored_command) + + def update(self, ota_runner): + logging.info('Rooting adb') + ota_runner.android_device.root_adb() + logging.info('Rebooting to sideload') + ota_runner.android_device.adb.reboot('sideload') + ota_runner.android_device.adb.wait_for_sideload() + logging.info('Sideloading ota package') + package_path = ota_runner.get_ota_package() + logging.info('Running adb sideload with package "%s"' % package_path) + sideload_result = ota_runner.android_device.adb.sideload( + package_path, timeout=PUSH_TIMEOUT) + logging.info('Sideload output: %s' % sideload_result) + logging.info('Sideload complete. Waiting for device to come back up.') + ota_runner.android_device.adb.wait_for_recovery() + ota_runner.android_device.adb.reboot() + logging.info('Device is up. Update complete.') diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py new file mode 100644 index 0000000000..e51fe6bce0 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +class OtaTool(object): + """A Wrapper for an OTA Update command or tool. + + Each OtaTool acts as a facade to the underlying command or tool used to + update the device. + """ + + def __init__(self, command): + """Creates an OTA Update tool with the given properties. + + Args: + command: A string that is used as the command line tool + """ + self.command = command + + def update(self, ota_runner): + """Begins the OTA Update. Returns after the update has installed. + + Args: + ota_runner: The OTA Runner that handles the device information. + """ + raise NotImplementedError() + + def cleanup(self, ota_runner): + """A cleanup method for the OTA Tool to run after the update completes. + + Args: + ota_runner: The OTA Runner that handles the device information. + """ + pass diff --git a/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py new file mode 100644 index 0000000000..ac81646011 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_tools/ota_tool_factory.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts.libs.ota.ota_tools.adb_sideload_ota_tool import AdbSideloadOtaTool +from acts.libs.ota.ota_tools.update_device_ota_tool import UpdateDeviceOtaTool + +_CONSTRUCTORS = { + AdbSideloadOtaTool.__name__: lambda command: AdbSideloadOtaTool(command), + UpdateDeviceOtaTool.__name__: lambda command: UpdateDeviceOtaTool(command), +} +_constructed_tools = {} + + +def create(ota_tool_class, command): + """Returns an OtaTool with the given class name. + + If the tool has already been created, the existing instance will be + returned. + + Args: + ota_tool_class: the class/type of the tool you wish to use. + command: the command line tool being used. + + Returns: + An OtaTool. + """ + if ota_tool_class in _constructed_tools: + return _constructed_tools[ota_tool_class] + + if ota_tool_class not in _CONSTRUCTORS: + raise KeyError('Given Ota Tool class name does not match a known ' + 'name. Found "%s". Expected any of %s. If this tool ' + 'does exist, add it to the _CONSTRUCTORS dict in this ' + 'module.' % (ota_tool_class, _CONSTRUCTORS.keys())) + + new_update_tool = _CONSTRUCTORS[ota_tool_class](command) + _constructed_tools[ota_tool_class] = new_update_tool + + return new_update_tool diff --git a/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py new file mode 100644 index 0000000000..978842ffb7 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_tools/update_device_ota_tool.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import shutil +import tempfile + +from acts.libs.ota.ota_tools import ota_tool +from acts.libs.proc import job +from acts import utils + +# OTA Packages can be upwards of 1 GB. This may take some time to transfer over +# USB 2.0. A/B devices must also complete the update in the background. +UPDATE_TIMEOUT = 20 * 60 +UPDATE_LOCATION = '/data/ota_package/update.zip' + + +class UpdateDeviceOtaTool(ota_tool.OtaTool): + """Runs an OTA Update with system/update_engine/scripts/update_device.py.""" + + def __init__(self, command): + super(UpdateDeviceOtaTool, self).__init__(command) + + self.unzip_path = tempfile.mkdtemp() + utils.unzip_maintain_permissions(self.command, self.unzip_path) + + self.command = os.path.join(self.unzip_path, 'update_device.py') + + def update(self, ota_runner): + logging.info('Forcing adb to be in root mode.') + ota_runner.android_device.root_adb() + update_command = 'python2.7 %s -s %s %s' % ( + self.command, ota_runner.serial, ota_runner.get_ota_package()) + logging.info('Running %s' % update_command) + result = job.run(update_command, timeout=UPDATE_TIMEOUT) + logging.info('Output: %s' % result.stdout) + + logging.info('Rebooting device for update to go live.') + ota_runner.android_device.adb.reboot() + logging.info('Reboot sent.') + + def __del__(self): + """Delete the unzipped update_device folder before ACTS exits.""" + shutil.rmtree(self.unzip_path) diff --git a/acts/framework/acts/libs/ota/ota_updater.py b/acts/framework/acts/libs/ota/ota_updater.py new file mode 100644 index 0000000000..ed300aab67 --- /dev/null +++ b/acts/framework/acts/libs/ota/ota_updater.py @@ -0,0 +1,66 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts.libs.ota.ota_runners import ota_runner_factory + +# Maps AndroidDevices to OtaRunners +ota_runners = {} + + +def initialize(user_params, android_devices): + """Initialize OtaRunners for each device. + + Args: + user_params: The user_params from the ACTS config. + android_devices: The android_devices in the test. + """ + for ad in android_devices: + ota_runners[ad] = ota_runner_factory.create_from_configs( + user_params, ad) + + +def _check_initialization(android_device): + """Check if a given device was initialized.""" + if android_device not in ota_runners: + raise KeyError('Android Device with serial "%s" has not been ' + 'initialized for OTA Updates. Did you forget to call' + 'ota_updater.initialize()?' % android_device.serial) + + +def update(android_device, ignore_update_errors=False): + """Update a given AndroidDevice. + + Args: + android_device: The device to update + ignore_update_errors: Whether or not to ignore update errors such as + no more updates available for a given device. Default is false. + Throws: + OtaError if ignore_update_errors is false and the OtaRunner has run out + of packages to update the phone with. + """ + _check_initialization(android_device) + try: + ota_runners[android_device].update() + except: + if ignore_update_errors: + return + raise + + +def can_update(android_device): + """Whether or not a device can be updated.""" + _check_initialization(android_device) + return ota_runners[android_device].can_update() diff --git a/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py index ff704fecb8..4c0e9290bf 100644 --- a/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BluetoothCarHfpBaseTest.py @@ -22,6 +22,7 @@ import os import time from queue import Empty +from acts.keys import Config from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from acts.test_utils.bt.bt_test_utils import pair_pri_to_sec from acts.test_utils.tel.tel_test_utils import ensure_phones_default_state diff --git a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py index f22e0b80db..85fdba3c83 100644 --- a/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BtFunhausBaseTest.py @@ -42,6 +42,7 @@ class BtFunhausBaseTest(BtMetricsBaseTest): def __init__(self, controllers): BtMetricsBaseTest.__init__(self, controllers) + self.ad = self.android_devices[0] def on_fail(self, test_name, begin_time): self._collect_bluetooth_manager_dumpsys_logs(self.android_devices) @@ -57,93 +58,8 @@ class BtFunhausBaseTest(BtMetricsBaseTest): if not bypass_setup_wizard(ad): self.log.debug( "Failed to bypass setup wizard, continuing test.") - if not "bt_config" in self.user_params.keys(): - self.log.error("Missing mandatory user config \"bt_config\"!") - return False - bt_config_map_file = self.user_params["bt_config"] - return self._setup_bt_config(bt_config_map_file) - - def _setup_bt_config(self, bt_config_map_file): - bt_config_map = {} - if not os.path.isfile(bt_config_map_file): - bt_config_map_file = os.path.join( - self.user_params[Config.key_config_path], bt_config_map_file) - if not os.path.isfile(bt_config_map_file): - self.log.error("Unable to load bt_config file {}.".format( - bt_config_map_file)) - return False - try: - f = open(bt_config_map_file, 'r') - bt_config_map = json.load(f) - f.close() - except FileNotFoundError: - self.log.error("File not found: {}.".format(bt_config_map_file)) - return False - # Connected devices return all upper case mac addresses. - # Make the peripheral_info address attribute upper case - # in order to ensure the BT mac addresses match. - for serial in bt_config_map.keys(): - mac_address = bt_config_map[serial]["peripheral_info"][ - "address"].upper() - bt_config_map[serial]["peripheral_info"]["address"] = mac_address - for ad in self.android_devices: - serial = ad.serial - - # Verify serial number in bt_config_map - self.log.info("Verify serial number of Android device in config.") - if serial not in bt_config_map.keys(): - self.log.error( - "Missing android device serial {} in bt_config.".format( - serial)) - return False - - # Push the bt_config.conf file to Android device - if (not self._push_config_to_android_device(ad, bt_config_map, - serial)): - return False - - # Add music to the Android device - if not self._add_music_to_android_device(ad): - return False - - # Verify Bluetooth is enabled - self.log.info("Verifying Bluetooth is enabled on Android Device.") - if not bluetooth_enabled_check(ad): - self.log.error("Failed to toggle on Bluetooth on device {}". - format(serial)) - return False - - # Verify Bluetooth device is connected - self.log.info( - "Waiting up to 10 seconds for device to reconnect...") - if not self._verify_bluetooth_device_is_connected( - ad, bt_config_map, serial): - self.device_fails_to_connect_list.append(ad) - if len(self.device_fails_to_connect_list) == len(self.android_devices): - self.log.error("All devices failed to reconnect.") - return False - return True - - def _push_config_to_android_device(self, ad, bt_config_map, serial): - """ - Push Bluetooth config file to android device so that it will have the - paired link key to the remote device - :param ad: Android device - :param bt_config_map: Map to each device's config - :param serial: Serial number of device - :return: True on success, False on failure - """ - self.log.info("Pushing bt_config.conf file to Android device.") - config_path = bt_config_map[serial]["config_path"] - if not os.path.isfile(config_path): - config_path = os.path.join( - self.user_params[Config.key_config_path], config_path) - if not os.path.isfile(config_path): - self.log.error("Unable to load bt_config file {}.".format( - config_path)) - return False - ad.adb.push("{} {}".format(config_path, BT_CONF_PATH)) - return True + # Add music to the Android device + return self._add_music_to_android_device(ad) def _add_music_to_android_device(self, ad): """ @@ -181,45 +97,6 @@ class BtFunhausBaseTest(BtMetricsBaseTest): ad.reboot() return True - def _verify_bluetooth_device_is_connected(self, ad, bt_config_map, serial): - """ - Verify that remote Bluetooth device is connected - :param ad: Android device - :param bt_config_map: Config map - :param serial: Serial number of Android device - :return: True on success, False on failure - """ - connected_devices = ad.droid.bluetoothGetConnectedDevices() - start_time = time.time() - wait_time = 10 - result = False - while time.time() < start_time + wait_time: - connected_devices = ad.droid.bluetoothGetConnectedDevices() - if len(connected_devices) > 0: - if bt_config_map[serial]["peripheral_info"]["address"] in { - d['address'] - for d in connected_devices - }: - result = True - break - else: - try: - ad.droid.bluetoothConnectBonded(bt_config_map[serial][ - "peripheral_info"]["address"]) - except Exception as err: - self.log.error("Failed to connect bonded. Err: {}".format( - err)) - if not result: - self.log.info("Connected Devices: {}".format(connected_devices)) - self.log.info("Bonded Devices: {}".format( - ad.droid.bluetoothGetBondedDevices())) - self.log.error( - "Failed to connect to peripheral name: {}, address: {}".format( - bt_config_map[serial]["peripheral_info"]["name"], - bt_config_map[serial]["peripheral_info"]["address"])) - self.device_fails_to_connect_list.append("{}:{}".format( - serial, bt_config_map[serial]["peripheral_info"]["name"])) - def _collect_bluetooth_manager_dumpsys_logs(self, ads): """ Collect "adb shell dumpsys bluetooth_manager" logs @@ -238,23 +115,13 @@ class BtFunhausBaseTest(BtMetricsBaseTest): def start_playing_music_on_all_devices(self): """ - Start playing music all devices + Start playing music :return: None """ - for ad in self.android_devices: - ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format( - self.music_file_to_play)) - ad.droid.mediaPlaySetLooping(True) - self.log.info("Music is now playing on device {}".format( - ad.serial)) - - def stop_playing_music_on_all_devices(self): - """ - Stop playing music on all devices - :return: None - """ - for ad in self.android_devices: - ad.droid.mediaPlayStopAll() + self.ad.droid.mediaPlayOpen("file:///sdcard/Music/{}".format( + self.music_file_to_play)) + self.ad.droid.mediaPlaySetLooping(True) + self.ad.log.info("Music is now playing.") def monitor_music_play_util_deadline(self, end_time, sleep_interval=1): """ @@ -273,32 +140,17 @@ class BtFunhausBaseTest(BtMetricsBaseTest): device_not_connected_list: List of ADs with no remote device connected """ - bluetooth_off_list = [] device_not_connected_list = [] while time.time() < end_time: - for ad in self.android_devices: - serial = ad.serial - if (not ad.droid.bluetoothCheckState() and - serial not in bluetooth_off_list): - self.log.error( - "Device {}'s Bluetooth state is off.".format(serial)) - bluetooth_off_list.append(serial) - if (ad.droid.bluetoothGetConnectedDevices() == 0 and - serial not in device_not_connected_list): - self.log.error( - "Device {} not connected to any Bluetooth devices.". - format(serial)) - device_not_connected_list.append(serial) - if len(bluetooth_off_list) == len(self.android_devices): - self.log.error( - "Bluetooth off on all Android devices. Ending Test") - return False, bluetooth_off_list, device_not_connected_list - if len(device_not_connected_list) == len(self.android_devices): - self.log.error( - "Every Android device has no device connected.") - return False, bluetooth_off_list, device_not_connected_list + if not self.ad.droid.bluetoothCheckState(): + self.ad.log.error("Device {}'s Bluetooth state is off.".format( + serial)) + return False + if self.ad.droid.bluetoothGetConnectedDevices() == 0: + self.ad.log.error( + "Bluetooth device not connected. Failing test.") time.sleep(sleep_interval) - return True, bluetooth_off_list, device_not_connected_list + return True def play_music_for_duration(self, duration, sleep_interval=1): """ @@ -316,8 +168,7 @@ class BtFunhausBaseTest(BtMetricsBaseTest): start_time = time.time() end_time = start_time + duration self.start_playing_music_on_all_devices() - status, bluetooth_off_list, device_not_connected_list = \ - self.monitor_music_play_util_deadline(end_time, sleep_interval) - if status: - self.stop_playing_music_on_all_devices() - return status, bluetooth_off_list, device_not_connected_list + status = self.monitor_music_play_util_deadline(end_time, + sleep_interval) + self.ad.droid.mediaPlayStopAll() + return status diff --git a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py index 2d16f4474b..66acc84a4b 100644 --- a/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BtMetricsBaseTest.py @@ -27,6 +27,8 @@ class BtMetricsBaseTest(BluetoothBaseTest): def __init__(self, controllers): BluetoothBaseTest.__init__(self, controllers) self.bluetooth_proto_path = None + self.dongle = self.relay_devices[0] + self.ad = self.android_devices[0] def setup_class(self): """ @@ -67,8 +69,40 @@ class BtMetricsBaseTest(BluetoothBaseTest): # Clear all metrics for ad in self.android_devices: get_bluetooth_metrics(ad, ad.bluetooth_proto_module) + self.dongle.setup() + tries = 5 + # Since we are not concerned with pairing in this test, try 5 times. + while tries > 0: + if self._pair_devices(): + return True + else: + tries -= 1 + return False + + def teardown_test(self): + super(BtMetricsBaseTest, self).teardown_test() + self.dongle.clean_up() return True + def _pair_devices(self): + self.ad.droid.bluetoothStartPairingHelper(False) + self.dongle.enter_pairing_mode() + + self.ad.droid.bluetoothBond(self.dongle.mac_address) + + end_time = time.time() + 20 + self.ad.log.info("Verifying devices are bonded") + while time.time() < end_time: + bonded_devices = self.ad.droid.bluetoothGetBondedDevices() + + for d in bonded_devices: + if d['address'] == self.dongle.mac_address: + self.ad.log.info("Successfully bonded to device.") + self.log.info("Bonded devices:\n{}".format(bonded_devices)) + return True + self.ad.log.info("Failed to bond devices.") + return False + def collect_bluetooth_manager_metrics_logs(self, ads): """ Collect Bluetooth metrics logs, save an ascii log to disk and return diff --git a/acts/framework/acts/test_utils/bt/PowerBaseTest.py b/acts/framework/acts/test_utils/bt/PowerBaseTest.py index 525317e0a2..a175c9e43b 100644 --- a/acts/framework/acts/test_utils/bt/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/bt/PowerBaseTest.py @@ -56,6 +56,17 @@ class PowerBaseTest(BluetoothBaseTest): "PMCMainActivity") PMC_VERBOSE_CMD = "setprop log.tag.PMC VERBOSE" + def setup_test(self): + self.timer_list = [] + for a in self.android_devices: + a.ed.clear_all_events() + a.droid.setScreenTimeout(20) + self.ad.go_to_sleep() + return True + + def teardown_test(self): + return True + def setup_class(self): # Not to call Base class setup_class() # since it removes the bonded devices @@ -85,8 +96,6 @@ class PowerBaseTest(BluetoothBaseTest): set_ambient_display(self.ad, False) self.ad.adb.shell("settings put system screen_brightness 0") set_auto_rotate(self.ad, False) - set_phone_screen_on(self.log, self.ad, self.SCREEN_TIME_OFF) - self.ad.go_to_sleep() wutils.wifi_toggle_state(self.ad, False) diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py index 5732351d83..c778466a39 100644 --- a/acts/framework/acts/test_utils/tel/tel_test_utils.py +++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py @@ -4575,7 +4575,10 @@ def start_adb_tcpdump(ad, test_name, mask="ims"): """ ad.log.debug("Ensuring no tcpdump is running in background") - ad.adb.shell("killall -9 tcpdump") + try: + ad.adb.shell("killall -9 tcpdump") + except AdbError: + self.log.warn("Killing existing tcpdump processes failed") begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) begin_time = normalize_log_line_timestamp(begin_time) file_name = "/sdcard/tcpdump{}{}{}.pcap".format(ad.serial, test_name, diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py index 8de6a6e82e..a04890676b 100755 --- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py +++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py @@ -85,13 +85,13 @@ class WifiBaseTest(BaseTestClass): "security": ref_5g_security, "password": ref_5g_passphrase } - + ap = 0 for ap in range(ap_count): self.user_params["reference_networks"].append({ "2g": - network_dict_2g, + copy.copy(network_dict_2g), "5g": - network_dict_5g + copy.copy(network_dict_5g) }) self.reference_networks = self.user_params["reference_networks"] return {"2g": network_dict_2g, "5g": network_dict_5g} @@ -118,6 +118,7 @@ class WifiBaseTest(BaseTestClass): open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) network_dict_2g = {"SSID": open_2g_ssid, "security": 'none'} network_dict_5g = {"SSID": open_5g_ssid, "security": 'none'} + ap = 0 for ap in range(ap_count): self.user_params["open_network"].append({ "2g": network_dict_2g, @@ -126,10 +127,12 @@ class WifiBaseTest(BaseTestClass): self.open_network = self.user_params["open_network"] return {"2g": network_dict_2g, "5g": network_dict_5g} - def populate_bssid(self, ap, networks_5g, networks_2g): + def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): """Get bssid for a given SSID and add it to the network dictionary. Args: + ap_instance: Accesspoint index that was configured. + ap: Accesspoint object corresponding to ap_instance. networks_5g: List of 5g networks configured on the APs. networks_2g: List of 2g networks configured on the APs. @@ -142,20 +145,16 @@ class WifiBaseTest(BaseTestClass): if 'channel' in network: continue bssid = ap.get_bssid_from_ssid(network["SSID"]) - if network["security"] == hostapd_constants.WPA2_STRING: - # TODO:(bamahadev) Change all occurances of reference_networks - # in to wpa_networks. - network_list = self.reference_networks - else: - network_list = self.open_network if '2g' in network["SSID"]: band = hostapd_constants.BAND_2G else: band = hostapd_constants.BAND_5G - # For each network update BSSID if it doesn't already exist. - for ref_network in network_list: - if not 'bssid' in ref_network[band]: - ref_network[band]["bssid"] = bssid + if network["security"] == hostapd_constants.WPA2_STRING: + # TODO:(bamahadev) Change all occurances of reference_networks + # to wpa_networks. + self.reference_networks[ap_instance][band]["bssid"] = bssid + else: + self.open_network[ap_instance][band]["bssid"] = bssid def legacy_configure_ap_and_start( self, @@ -199,11 +198,11 @@ class WifiBaseTest(BaseTestClass): self.config_5g = self._generate_legacy_ap_config(network_list_5g) if len(network_list_2g) > 1: self.config_2g = self._generate_legacy_ap_config(network_list_2g) - + ap = 0 for ap in range(ap_count): self.access_points[ap].start_ap(self.config_2g) self.access_points[ap].start_ap(self.config_5g) - self.populate_bssid(self.access_points[ap], orig_network_list_5g, + self.populate_bssid(ap, self.access_points[ap], orig_network_list_5g, orig_network_list_2g) def _generate_legacy_ap_config(self, network_list): diff --git a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py index 5adb7950a9..44380640bc 100644 --- a/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/aware/aware_test_utils.py @@ -372,6 +372,26 @@ def request_network(dut, ns): network_req = {"TransportType": 5, "NetworkSpecifier": ns} return dut.droid.connectivityRequestWifiAwareNetwork(network_req) +def get_network_specifier(dut, id, dev_type, peer_mac, sec): + """Create a network specifier for the device based on the security + configuration. + + Args: + dut: device + id: session ID + dev_type: device type - Initiator or Responder + peer_mac: the discovery MAC address of the peer + sec: security configuration + """ + if sec is None: + return dut.droid.wifiAwareCreateNetworkSpecifierOob( + id, dev_type, peer_mac) + if isinstance(sec, str): + return dut.droid.wifiAwareCreateNetworkSpecifierOob( + id, dev_type, peer_mac, sec) + return dut.droid.wifiAwareCreateNetworkSpecifierOob( + id, dev_type, peer_mac, None, sec) + def configure_dw(device, is_default, is_24_band, value): """Use the command-line API to configure the DW (discovery window) setting @@ -475,6 +495,23 @@ def create_discovery_config(service_name, config[aconsts.DISCOVERY_KEY_TERM_CB_ENABLED] = term_cb_enable return config +def attach_with_identity(dut): + """Start an Aware session (attach) and wait for confirmation and identity + information (mac address). + + Args: + dut: Device under test + Returns: + id: Aware session ID. + mac: Discovery MAC address of this device. + """ + id = dut.droid.wifiAwareAttach(True) + wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED) + event = wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) + mac = event["data"]["mac"] + + return id, mac + def create_discovery_pair(p_dut, s_dut, p_config, diff --git a/acts/framework/acts/test_utils/wifi/wifi_constants.py b/acts/framework/acts/test_utils/wifi/wifi_constants.py index dda2a40937..1ded7db209 100644 --- a/acts/framework/acts/test_utils/wifi/wifi_constants.py +++ b/acts/framework/acts/test_utils/wifi/wifi_constants.py @@ -23,3 +23,8 @@ WIFI_FORGET_NW_SUCCESS = "WifiManagerForgetNetworkOnSuccess" # These constants will be used by the ACTS wifi tests. CONNECT_BY_CONFIG_SUCCESS = 'WifiManagerConnectByConfigOnSuccess' CONNECT_BY_NETID_SUCCESS = 'WifiManagerConnectByNetIdOnSuccess' + +# AP related constants +AP_MAIN = "main_AP" +AP_AUX = "aux_AP" +SSID = "SSID" diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py new file mode 100644 index 0000000000..031ef08613 --- /dev/null +++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py @@ -0,0 +1,606 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from acts import asserts +from acts import utils +from acts.controllers import monsoon +from acts.libs.proc import job +from acts.test_utils.wifi import wifi_test_utils as wutils +from bokeh.layouts import layout +from bokeh.models import CustomJS, ColumnDataSource +from bokeh.models.widgets import DataTable, TableColumn +from bokeh.plotting import figure, output_file, save +from acts.controllers.ap_lib import hostapd_security +from acts.controllers.ap_lib import hostapd_ap_preset +# http://www.secdev.org/projects/scapy/ +# On ubuntu, sudo pip3 install scapy-python3 +import scapy.all as scapy + +SETTINGS_PAGE = "am start -n com.android.settings/.Settings" +SCROLL_BOTTOM = "input swipe 0 2000 0 0" +UNLOCK_SCREEN = "input keyevent 82" +SCREENON_USB_DISABLE = "dumpsys battery unplug" +RESET_BATTERY_STATS = "dumpsys batterystats --reset" +AOD_OFF = "settings put secure doze_always_on 0" +MUSIC_IQ_OFF = "pm disable-user com.google.intelligence.sense" +# Command to disable gestures +LIFT = "settings put secure doze_pulse_on_pick_up 0" +DOUBLE_TAP = "settings put secure doze_pulse_on_double_tap 0" +JUMP_TO_CAMERA = "settings put secure camera_double_tap_power_gesture_disabled 1" +RAISE_TO_CAMERA = "settings put secure camera_lift_trigger_enabled 0" +FLIP_CAMERA = "settings put secure camera_double_twist_to_flip_enabled 0" +ASSIST_GESTURE = "settings put secure assist_gesture_enabled 0" +ASSIST_GESTURE_ALERT = "settings put secure assist_gesture_silence_alerts_enabled 0" +ASSIST_GESTURE_WAKE = "settings put secure assist_gesture_wake_enabled 0" +SYSTEM_NAVI = "settings put secure system_navigation_keys_enabled 0" +# End of command to disable gestures +AUTO_TIME_OFF = "settings put global auto_time 0" +AUTO_TIMEZONE_OFF = "settings put global auto_time_zone 0" +FORCE_YOUTUBE_STOP = "am force-stop com.google.android.youtube" +FORCE_DIALER_STOP = "am force-stop com.google.android.dialer" +IPERF_TIMEOUT = 180 +THRESHOLD_TOLERANCE = 0.2 +GET_FROM_PHONE = 'get_from_dut' +GET_FROM_AP = 'get_from_ap' + + +def dut_rockbottom(ad): + """Set the phone into Rock-bottom state. + + Args: + ad: the target android device, AndroidDevice object + + """ + ad.log.info("Now set the device to Rockbottom State") + utils.require_sl4a((ad, )) + ad.droid.connectivityToggleAirplaneMode(False) + time.sleep(5) + ad.droid.connectivityToggleAirplaneMode(True) + utils.set_ambient_display(ad, False) + utils.set_auto_rotate(ad, False) + utils.set_adaptive_brightness(ad, False) + utils.sync_device_time(ad) + utils.set_location_service(ad, False) + utils.set_mobile_data_always_on(ad, False) + utils.disable_doze_light(ad) + utils.disable_doze(ad) + wutils.reset_wifi(ad) + wutils.wifi_toggle_state(ad, False) + ad.droid.nfcDisable() + ad.droid.setScreenBrightness(0) + ad.adb.shell(AOD_OFF) + ad.droid.setScreenTimeout(2200) + ad.droid.wakeUpNow() + ad.adb.shell(LIFT) + ad.adb.shell(DOUBLE_TAP) + ad.adb.shell(JUMP_TO_CAMERA) + ad.adb.shell(RAISE_TO_CAMERA) + ad.adb.shell(FLIP_CAMERA) + ad.adb.shell(ASSIST_GESTURE) + ad.adb.shell(ASSIST_GESTURE_ALERT) + ad.adb.shell(ASSIST_GESTURE_WAKE) + ad.adb.shell(SCREENON_USB_DISABLE) + ad.adb.shell(UNLOCK_SCREEN) + ad.adb.shell(SETTINGS_PAGE) + ad.adb.shell(SCROLL_BOTTOM) + ad.adb.shell(MUSIC_IQ_OFF) + ad.adb.shell(AUTO_TIME_OFF) + ad.adb.shell(AUTO_TIMEZONE_OFF) + ad.adb.shell(FORCE_YOUTUBE_STOP) + ad.adb.shell(FORCE_DIALER_STOP) + ad.droid.wakeUpNow() + ad.log.info('Device has been set to Rockbottom state') + + +def pass_fail_check(test_class, test_result): + """Check the test result and decide if it passed or failed. + The threshold is provided in the config file + + Args: + test_class: the specific test class where test is running + avg_current: the average current as the test result + """ + test_name = test_class.current_test_name + current_threshold = test_class.threshold[test_name] + asserts.assert_true( + abs(test_result - current_threshold) / current_threshold < + THRESHOLD_TOLERANCE, + ("Measured average current in [%s]: %s, which is " + "more than %d percent off than acceptable threshold %.2fmA") % + (test_name, test_result, THRESHOLD_TOLERANCE * 100, current_threshold)) + asserts.explicit_pass("Measurement finished for %s." % test_name) + + +def monsoon_data_collect_save(ad, mon_info, test_name, bug_report): + """Current measurement and save the log file. + + Collect current data using Monsoon box and return the path of the + log file. Take bug report if requested. + + Args: + ad: the android device under test + mon_info: dict with information of monsoon measurement, including + monsoon device object, measurement frequency, duration and + offset etc. + test_name: current test name, used to contruct the result file name + bug_report: indicator to take bug report or not, 0 or 1 + Returns: + data_path: the absolute path to the log file of monsoon current + measurement + avg_current: the average current of the test + """ + log = logging.getLogger() + log.info("Starting power measurement with monsoon box") + tag = (test_name + '_' + ad.model + '_' + ad.build_info['build_id']) + #Resets the battery status right before the test started + ad.adb.shell(RESET_BATTERY_STATS) + begin_time = utils.get_current_human_time() + #Start the power measurement using monsoon + result = mon_info['dut'].measure_power( + mon_info['freq'], + mon_info['duration'], + tag=tag, + offset=mon_info['offset']) + data_path = os.path.join(mon_info['data_path'], "%s.txt" % tag) + avg_current = result.average_current + monsoon.MonsoonData.save_to_text_file([result], data_path) + log.info("Power measurement done") + if bool(bug_report) == True: + ad.take_bug_report(test_name, begin_time) + return data_path, avg_current + + +def monsoon_data_plot(mon_info, file_path, tag=""): + """Plot the monsoon current data using bokeh interactive plotting tool. + + Plotting power measurement data with bokeh to generate interactive plots. + You can do interactive data analysis on the plot after generating with the + provided widgets, which make the debugging much easier. To realize that, + bokeh callback java scripting is used. View a sample html output file: + https://drive.google.com/open?id=0Bwp8Cq841VnpT2dGUUxLYWZvVjA + + Args: + mon_info: dict with information of monsoon measurement, including + monsoon device object, measurement frequency, duration and + offset etc. + file_path: the path to the monsoon log file with current data + + Returns: + plot: the plotting object of bokeh, optional, will be needed if multiple + plots will be combined to one html file. + dt: the datatable object of bokeh, optional, will be needed if multiple + datatables will be combined to one html file. + """ + + log = logging.getLogger() + log.info("Plot the power measurement data") + #Get results as monsoon data object from the input file + results = monsoon.MonsoonData.from_text_file(file_path) + #Decouple current and timestamp data from the monsoon object + current_data = [] + timestamps = [] + voltage = results[0].voltage + [current_data.extend(x.data_points) for x in results] + [timestamps.extend(x.timestamps) for x in results] + period = 1 / float(mon_info['freq']) + time_relative = [x * period for x in range(len(current_data))] + #Calculate the average current for the test + current_data = [x * 1000 for x in current_data] + avg_current = sum(current_data) / len(current_data) + color = ['navy'] * len(current_data) + + #Preparing the data and source link for bokehn java callback + source = ColumnDataSource(data=dict( + x0=time_relative, y0=current_data, color=color)) + s2 = ColumnDataSource(data=dict( + z0=[mon_info['duration']], + y0=[round(avg_current, 2)], + x0=[round(avg_current * voltage, 2)], + z1=[round(avg_current * voltage * mon_info['duration'], 2)], + z2=[round(avg_current * mon_info['duration'], 2)])) + #Setting up data table for the output + columns = [ + TableColumn(field='z0', title='Total Duration (s)'), + TableColumn(field='y0', title='Average Current (mA)'), + TableColumn(field='x0', title='Average Power (4.2v) (mW)'), + TableColumn(field='z1', title='Average Energy (mW*s)'), + TableColumn(field='z2', title='Normalized Average Energy (mA*s)') + ] + dt = DataTable( + source=s2, columns=columns, width=1300, height=60, editable=True) + + plot_title = file_path[file_path.rfind('/') + 1:-4] + tag + output_file("%s/%s.html" % (mon_info['data_path'], plot_title)) + TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,' + 'hover,xwheel_zoom,ywheel_zoom,save') + # Create a new plot with the datatable above + plot = figure( + plot_width=1300, + plot_height=700, + title=plot_title, + tools=TOOLS, + webgl=True) + plot.line('x0', 'y0', source=source, line_width=2) + plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color') + plot.xaxis.axis_label = 'Time (s)' + plot.yaxis.axis_label = 'Current (mA)' + plot.title.text_font_size = {'value': '15pt'} + + #Callback Java scripting + source.callback = CustomJS( + args=dict(mytable=dt), + code=""" + var inds = cb_obj.get('selected')['1d'].indices; + var d1 = cb_obj.get('data'); + var d2 = mytable.get('source').get('data'); + ym = 0 + ts = 0 + d2['x0'] = [] + d2['y0'] = [] + d2['z1'] = [] + d2['z2'] = [] + d2['z0'] = [] + min=max=d1['x0'][inds[0]] + if (inds.length==0) {return;} + for (i = 0; i < inds.length; i++) { + ym += d1['y0'][inds[i]] + d1['color'][inds[i]] = "red" + if (d1['x0'][inds[i]] < min) { + min = d1['x0'][inds[i]]} + if (d1['x0'][inds[i]] > max) { + max = d1['x0'][inds[i]]} + } + ym /= inds.length + ts = max - min + dx0 = Math.round(ym*4.2*100.0)/100.0 + dy0 = Math.round(ym*100.0)/100.0 + dz1 = Math.round(ym*4.2*ts*100.0)/100.0 + dz2 = Math.round(ym*ts*100.0)/100.0 + dz0 = Math.round(ts*1000.0)/1000.0 + d2['z0'].push(dz0) + d2['x0'].push(dx0) + d2['y0'].push(dy0) + d2['z1'].push(dz1) + d2['z2'].push(dz2) + mytable.trigger('change'); + """) + + #Layout the plot and the datatable bar + l = layout([[dt], [plot]]) + save(l) + return [plot, dt] + + +def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=6): + """Function to change the DTIM setting in the phone. + + Args: + ad: the target android device, AndroidDevice object + gEnableModulatedDTIM: Modulated DTIM, int + gMaxLIModulatedDTIM: Maximum modulated DTIM, int + """ + serial = ad.serial + ini_file_phone = 'vendor/firmware/wlan/qca_cld/WCNSS_qcom_cfg.ini' + ini_file_local = 'local_ini_file.ini' + ini_pull_cmd = 'adb -s %s pull %s %s' % (serial, ini_file_phone, + ini_file_local) + ini_push_cmd = 'adb -s %s push %s %s' % (serial, ini_file_local, + ini_file_phone) + utils.exe_cmd(ini_pull_cmd) + + with open(ini_file_local, 'r') as fin: + for line in fin: + if 'gEnableModulatedDTIM=' in line: + gEDTIM_old = line.strip('gEnableModulatedDTIM=').strip('\n') + if 'gMaxLIModulatedDTIM=' in line: + gMDTIM_old = line.strip('gMaxLIModulatedDTIM=').strip('\n') + if int(gEDTIM_old) == gEnableModulatedDTIM: + ad.log.info('Current DTIM is already the desired value,' + 'no need to reset it') + return + + gE_old = 'gEnableModulatedDTIM=' + gEDTIM_old + gM_old = 'gMaxLIModulatedDTIM=' + gMDTIM_old + gE_new = 'gEnableModulatedDTIM=' + str(gEnableModulatedDTIM) + gM_new = 'gMaxLIModulatedDTIM=' + str(gMaxLIModulatedDTIM) + + sed_gE = 'sed -i \'s/%s/%s/g\' %s' % (gE_old, gE_new, ini_file_local) + sed_gM = 'sed -i \'s/%s/%s/g\' %s' % (gM_old, gM_new, ini_file_local) + utils.exe_cmd(sed_gE) + utils.exe_cmd(sed_gM) + + utils.exe_cmd('adb -s {} root'.format(serial)) + cmd_out = utils.exe_cmd('adb -s {} remount'.format(serial)) + if ("Permission denied").encode() in cmd_out: + ad.log.info('Need to disable verity first and reboot') + utils.exe_cmd('adb -s {} disable-verity'.format(serial)) + time.sleep(1) + ad.reboot() + ad.log.info('Verity disabled and device back from reboot') + utils.exe_cmd('adb -s {} root'.format(serial)) + utils.exe_cmd('adb -s {} remount'.format(serial)) + time.sleep(1) + utils.exe_cmd(ini_push_cmd) + ad.log.info('ini file changes checked in and rebooting...') + ad.reboot() + ad.log.info('DTIM updated and device back from reboot') + + +def ap_setup(ap, network): + """Set up the whirlwind AP with provided network info. + + Args: + ap: access_point object of the AP + network: dict with information of the network, including ssid, password + bssid, channel etc. + """ + + log = logging.getLogger() + bss_settings = [] + ssid = network[wutils.WifiEnums.SSID_KEY] + if "password" in network.keys(): + password = network["password"] + security = hostapd_security.Security( + security_mode="wpa", password=password) + else: + security = hostapd_security.Security(security_mode=None, password=None) + channel = network["channel"] + config = hostapd_ap_preset.create_ap_preset( + channel=channel, + ssid=ssid, + security=security, + bss_settings=bss_settings, + profile_name='whirlwind') + ap.start_ap(config) + log.info("AP started on channel {} with SSID {}".format(channel, ssid)) + + +def bokeh_plot(data_sets, legends, fig_property): + """Plot bokeh figs. + Args: + data_sets: data sets including lists of x_data and lists of y_data + ex: [[[x_data1], [x_data2]], [[y_data1],[y_data2]]] + legends: list of legend for each curve + fig_property: dict containing the plot property, including title, + lables, linewidth, circle size, etc. + Returns: + plot: bokeh plot figure object + """ + TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,resize,reset,' + 'hover,xwheel_zoom,ywheel_zoom,save') + plot = figure( + plot_width=1300, + plot_height=700, + title=fig_property['title'], + tools=TOOLS, + webgl=True) + colors = [ + 'red', 'green', 'blue', 'olive', 'orange', 'salmon', 'black', 'navy', + 'yellow', 'darkred', 'goldenrod' + ] + for x_data, y_data, legend in zip(data_sets[0], data_sets[1], legends): + index_now = legends.index(legend) + color = colors[index_now % len(colors)] + plot.line( + x_data, y_data, legend=str(legend), line_width=3, color=color) + plot.circle( + x_data, y_data, size=10, legend=str(legend), fill_color=color) + #Plot properties + plot.xaxis.axis_label = fig_property['x_label'] + plot.yaxis.axis_label = fig_property['y_label'] + plot.legend.location = "top_right" + plot.legend.click_policy = "hide" + plot.title.text_font_size = {'value': '15pt'} + return plot + + +def run_iperf_client_nonblocking(ad, server_host, extra_args=""): + """Start iperf client on the device with nohup. + + Return status as true if iperf client start successfully. + And data flow information as results. + + Args: + ad: the android device under test + server_host: Address of the iperf server. + extra_args: A string representing extra arguments for iperf client, + e.g. "-i 1 -t 30". + + """ + log = logging.getLogger() + ad.adb.shell_nb("nohup iperf3 -c {} {} &".format(server_host, extra_args)) + log.info("IPerf client started") + + +def get_wifi_rssi(ad): + """Get the RSSI of the device. + + Args: + ad: the android device under test + Returns: + RSSI: the rssi level of the device + """ + RSSI = ad.droid.wifiGetConnectionInfo()['rssi'] + return RSSI + + +def get_phone_ip(ad): + """Get the WiFi IP address of the phone. + + Args: + ad: the android device under test + Returns: + IP: IP address of the phone for WiFi, as a string + """ + IP = ad.droid.connectivityGetIPv4Addresses('wlan0')[0] + + return IP + + +def get_phone_mac(ad): + """Get the WiFi MAC address of the phone. + + Args: + ad: the android device under test + Returns: + mac: MAC address of the phone for WiFi, as a string + """ + mac = ad.droid.wifiGetConnectionInfo()["mac_address"] + + return mac + + +def get_phone_ipv6(ad): + """Get the WiFi IPV6 address of the phone. + + Args: + ad: the android device under test + Returns: + IPv6: IPv6 address of the phone for WiFi, as a string + """ + IPv6 = ad.droid.connectivityGetLinkLocalIpv6Address('wlan0')[:-6] + + return IPv6 + + +def get_if_addr6(intf, address_type): + """Returns the Ipv6 address from a given local interface. + + Returns the desired IPv6 address from the interface 'intf' in human + readable form. The address type is indicated by the IPv6 constants like + IPV6_ADDR_LINKLOCAL, IPV6_ADDR_GLOBAL, etc. If no address is found, + None is returned. + + Args: + intf: desired interface name + address_type: addrees typle like LINKLOCAL or GLOBAL + + Returns: + Ipv6 address of the specified interface in human readable format + """ + for if_list in scapy.in6_getifaddr(): + if if_list[2] == intf and if_list[1] == address_type: + return if_list[0] + + return None + + +@utils.timeout(60) +def wait_for_dhcp(intf): + """Wait the DHCP address assigned to desired interface. + + Getting DHCP address takes time and the wait time isn't constant. Utilizing + utils.timeout to keep trying until success + + Args: + intf: desired interface name + Returns: + ip: ip address of the desired interface name + Raise: + TimeoutError: After timeout, if no DHCP assigned, raise + """ + log = logging.getLogger() + reset_host_interface(intf) + ip = '0.0.0.0' + while ip == '0.0.0.0': + ip = scapy.get_if_addr(intf) + log.info('DHCP address assigned to {}'.format(intf)) + return ip + + +def reset_host_interface(intf): + """Reset the host interface. + + Args: + intf: the desired interface to reset + """ + log = logging.getLogger() + intf_down_cmd = 'ifconfig %s down' % intf + intf_up_cmd = 'ifconfig %s up' % intf + try: + job.run(intf_down_cmd) + time.sleep(3) + job.run(intf_up_cmd) + time.sleep(3) + log.info('{} has been reset'.format(intf)) + except job.Error: + raise Exception('No such interface') + + +def create_pkt_config(test_class): + """Creates the config for generating multicast packets + + Args: + test_class: object with all networking paramters + + Returns: + Dictionary with the multicast packet config + """ + addr_type = (scapy.IPV6_ADDR_LINKLOCAL + if test_class.ipv6_src_type == 'LINK_LOCAL' else + scapy.IPV6_ADDR_GLOBAL) + + mac_dst = test_class.mac_dst + if GET_FROM_PHONE in test_class.mac_dst: + mac_dst = get_phone_mac(test_class.dut) + + ipv4_dst = test_class.ipv4_dst + if GET_FROM_PHONE in test_class.ipv4_dst: + ipv4_dst = get_phone_ip(test_class.dut) + + ipv6_dst = test_class.ipv6_dst + if GET_FROM_PHONE in test_class.ipv6_dst: + ipv6_dst = get_phone_ipv6(test_class.dut) + + ipv4_gw = test_class.ipv4_gwt + if GET_FROM_AP in test_class.ipv4_gwt: + ipv4_gw = test_class.access_point.ssh_settings.hostname + + pkt_gen_config = { + 'interf': test_class.pkt_sender.interface, + 'subnet_mask': test_class.sub_mask, + 'src_mac': test_class.mac_src, + 'dst_mac': mac_dst, + 'src_ipv4': test_class.ipv4_src, + 'dst_ipv4': ipv4_dst, + 'src_ipv6': test_class.ipv6_src, + 'src_ipv6_type': addr_type, + 'dst_ipv6': ipv6_dst, + 'gw_ipv4': ipv4_gw + } + return pkt_gen_config + + +def create_monsoon_info(test_class): + """Creates the config dictionary for monsoon + + Args: + test_class: object with all the parameters + + Returns: + Dictionary with the monsoon packet config + """ + mon_info = { + 'dut': test_class.mon, + 'freq': test_class.mon_freq, + 'duration': test_class.mon_duration, + 'offset': test_class.mon_offset, + 'data_path': test_class.mon_data_path + } + return mon_info diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py index 263fabee31..0e0cbfc459 100755 --- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py @@ -1268,7 +1268,7 @@ def convert_pem_key_to_pkcs8(in_file, out_file): utils.exe_cmd(cmd) -def validate_connection(ad, ping_addr): +def validate_connection(ad, ping_addr=DEFAULT_PING_ADDR): """Validate internet connection by pinging the address provided. Args: diff --git a/acts/framework/acts/utils.py b/acts/framework/acts/utils.py index 5c398f8358..e13b964853 100755 --- a/acts/framework/acts/utils.py +++ b/acts/framework/acts/utils.py @@ -28,6 +28,7 @@ import string import subprocess import time import traceback +import zipfile from acts.controllers import adb @@ -846,3 +847,28 @@ def adb_shell_ping(ad, return False finally: ad.adb.shell("rm /data/ping.txt", timeout=10, ignore_status=True) + + +def unzip_maintain_permissions(zip_path, extract_location): + """Unzip a .zip file while maintaining permissions. + + Args: + zip_path: The path to the zipped file. + extract_location: the directory to extract to. + """ + with zipfile.ZipFile(zip_path, 'r') as zip_file: + for info in zip_file.infolist(): + _extract_file(zip_file, info, extract_location) + + +def _extract_file(zip_file, zip_info, extract_location): + """Extracts a single entry from a ZipFile while maintaining permissions. + + Args: + zip_file: A zipfile.ZipFile. + zip_info: A ZipInfo object from zip_file. + extract_location: The directory to extract to. + """ + out_path = zip_file.extract(zip_info.filename, path=extract_location) + perm = zip_info.external_attr >> 16 + os.chmod(out_path, perm) diff --git a/acts/framework/setup.py b/acts/framework/setup.py index 131f5de36f..48c71bbafa 100755 --- a/acts/framework/setup.py +++ b/acts/framework/setup.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3.4 # -# Copyright 2016 - The Android Open Source Project +# Copyright 2017 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -32,6 +32,8 @@ install_requires = [ 'pyserial', 'shellescape', 'protobuf', + 'roman', + 'scapy-python3', ] if sys.version_info < (3, ): diff --git a/acts/framework/tests/acts_relay_controller_test.py b/acts/framework/tests/acts_relay_controller_test.py index 61434da255..2676bffbde 100755 --- a/acts/framework/tests/acts_relay_controller_test.py +++ b/acts/framework/tests/acts_relay_controller_test.py @@ -169,6 +169,7 @@ class ActsRelayTest(unittest.TestCase): def test_clean_up_default_on(self): new_relay = Relay(self.board, 0) + new_relay._original_state = RelayState.NO self.board.set(new_relay.position, RelayState.NO) new_relay.clean_up() @@ -177,12 +178,23 @@ class ActsRelayTest(unittest.TestCase): def test_clean_up_default_off(self): new_relay = Relay(self.board, 0) + new_relay._original_state = RelayState.NO self.board.set(new_relay.position, RelayState.NC) new_relay.clean_up() self.assertEqual( self.board.get_relay_status(new_relay.position), RelayState.NO) + def test_clean_up_original_state_none(self): + val = 'STAYS_THE_SAME' + new_relay = Relay(self.board, 0) + # _original_state is none by default + # The line below sets the dict to an impossible value. + self.board.set(new_relay.position, val) + new_relay.clean_up() + # If the impossible value is cleared, then the test should fail. + self.assertEqual(self.board.get_relay_status(new_relay.position), val) + class ActsSainSmartBoardTest(unittest.TestCase): STATUS_MSG = ('<small><a href="{}"></a>' @@ -204,10 +216,8 @@ class ActsSainSmartBoardTest(unittest.TestCase): file.write(self.RELAY_ON_PAGE_CONTENTS) self.config = ({ - 'name': - 'SSBoard', - 'base_url': - self.test_dir, + 'name': 'SSBoard', + 'base_url': self.test_dir, 'relays': [{ 'name': '0', 'relay_pos': 0 @@ -652,10 +662,8 @@ class TestRelayRigParser(unittest.TestCase): rig.relays['r0'] = self.r0 rig.relays['r1'] = self.r1 config = { - 'type': - 'SomeInvalidType', - 'name': - '.', + 'type': 'SomeInvalidType', + 'name': '.', 'relays': [{ 'name': 'r0', 'pos': 'MockBoard/0' diff --git a/acts/tests/google/bt/AkXB10PairingTest.py b/acts/tests/google/bt/AkXB10PairingTest.py new file mode 100644 index 0000000000..51e1e5977a --- /dev/null +++ b/acts/tests/google/bt/AkXB10PairingTest.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Test pairing of an Android Device to a A&K XB10 Bluetooth speaker +""" +import logging +import time + +from acts.controllers.relay_lib.relay import SynchronizeRelays +from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest + +log = logging + + +class AkXB10PairingTest(BluetoothBaseTest): + DISCOVERY_TIME = 5 + + def __init__(self, controllers): + BluetoothBaseTest.__init__(self, controllers) + self.dut = self.android_devices[0] + # Do factory reset and then do delay for 3-seconds + self.dut.droid.bluetoothFactoryReset() + time.sleep(3) + self.ak_xb10_speaker = self.relay_devices[0] + + def setup_test(self): + super(BluetoothBaseTest, self).setup_test() + self.ak_xb10_speaker.setup() + self.ak_xb10_speaker.power_on() + # Wait for a moment between pushing buttons + time.sleep(0.25) + self.ak_xb10_speaker.enter_pairing_mode() + + def teardown_test(self): + super(BluetoothBaseTest, self).teardown_test() + self.ak_xb10_speaker.power_off() + self.ak_xb10_speaker.clean_up() + + def _perform_classic_discovery(self, scan_time=DISCOVERY_TIME): + self.dut.droid.bluetoothStartDiscovery() + time.sleep(scan_time) + self.dut.droid.bluetoothCancelDiscovery() + return self.dut.droid.bluetoothGetDiscoveredDevices() + + @BluetoothBaseTest.bt_test_wrap + def test_speaker_on(self): + """Test if the A&K XB10 speaker is powered on. + + Use scanning to determine if the speaker is powered on. + + Steps: + 1. Put the speaker into pairing mode. (Hold the button) + 2. Perform a scan on the DUT + 3. Check the scan list for the device. + + Expected Result: + Speaker is found. + + Returns: + Pass if True + Fail if False + + TAGS: ACTS_Relay + Priority: 1 + """ + + for device in self._perform_classic_discovery(): + if device['address'] == self.ak_xb10_speaker.mac_address: + self.dut.log.info("Desired device with MAC address %s found!", + self.ak_xb10_speaker.mac_address) + return True + return False + + @BluetoothBaseTest.bt_test_wrap + def test_speaker_off(self): + """Test if the A&K XB10 speaker is powered off. + + Use scanning to determine if the speaker is powered off. + + Steps: + 1. Power down the speaker + 2. Put the speaker into pairing mode. (Hold the button) + 3. Perform a scan on the DUT + 4. Check the scan list for the device. + + Expected Result: + Speaker is not found. + + Returns: + Pass if True + Fail if False + + TAGS: ACTS_Relay + Priority: 1 + """ + # Specific part of the test, turn off the speaker + self.ak_xb10_speaker.power_off() + + device_not_found = True + for device in self._perform_classic_discovery(): + if device['address'] == self.ak_xb10_speaker.mac_address: + self.dut.log.info( + "Undesired device with MAC address %s found!", + self.ak_xb10_speaker.mac_address) + device_not_found = False + + # Set the speaker back to the normal for tear_down() + self.ak_xb10_speaker.power_on() + # Give the relay and speaker some time, before it is turned off. + time.sleep(5) + return device_not_found + + @BluetoothBaseTest.bt_test_wrap + def test_pairing(self): + """Test pairing between a phone and A&K XB10 speaker. + + Test the A&K XB10 speaker can be paired to phone. + + Steps: + 1. Find the MAC address of remote controller from relay config file. + 2. Start the device paring process. + 3. Enable remote controller in pairing mode. + 4. Verify the remote is paired. + + Expected Result: + Speaker is paired. + + Returns: + Pass if True + Fail if False + + TAGS: ACTS_Relay + Priority: 1 + """ + + # BT scan activity + self._perform_classic_discovery() + self.dut.droid.bluetoothDiscoverAndBond( + self.ak_xb10_speaker.mac_address) + + end_time = time.time() + 20 + self.dut.log.info("Verifying devices are bonded") + while (time.time() < end_time): + bonded_devices = self.dut.droid.bluetoothGetBondedDevices() + for d in bonded_devices: + if d['address'] == self.ak_xb10_speaker.mac_address: + self.dut.log.info("Successfully bonded to device.") + self.log.info( + "A&K XB10 Bonded devices:\n{}".format(bonded_devices)) + return True + # Timed out trying to bond. + self.dut.log.info("Failed to bond devices.") + + return False diff --git a/acts/tests/google/bt/SonyXB2PairingTest.py b/acts/tests/google/bt/SonyXB2PairingTest.py index 3bb2ae7168..39071354f0 100644 --- a/acts/tests/google/bt/SonyXB2PairingTest.py +++ b/acts/tests/google/bt/SonyXB2PairingTest.py @@ -112,8 +112,9 @@ class SonyXB2PairingTest(BluetoothBaseTest): device_not_found = True for device in self._perform_classic_discovery(): if device['address'] == self.sony_xb2_speaker.mac_address: - self.dut.log.info("Undesired device with MAC address %s found!", - self.sony_xb2_speaker.mac_address) + self.dut.log.info( + "Undesired device with MAC address %s found!", + self.sony_xb2_speaker.mac_address) device_not_found = False # Set the speaker back to the normal for tear_down() @@ -158,7 +159,7 @@ class SonyXB2PairingTest(BluetoothBaseTest): if d['address'] == self.sony_xb2_speaker.mac_address: self.dut.log.info("Successfully bonded to device.") self.log.info( - "XB2 Bonded devices:\n{}".format(bonded_devices)) + "Sony XB2 Bonded devices:\n{}".format(bonded_devices)) return True # Timed out trying to bond. self.dut.log.info("Failed to bond devices.") diff --git a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py index af1f1f86f0..2725b2cfab 100644 --- a/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py +++ b/acts/tests/google/bt/audio_lab/BtFunhausMetricsTest.py @@ -58,16 +58,14 @@ class BtFunhausMetricsTest(BtFunhausBaseTest): """ play_duration_seconds = 60 start_time = time.time() - status, bluetooth_off_list, device_not_connected_list = \ - self.play_music_for_duration(play_duration_seconds) - if not status: - return status - self.stop_playing_music_on_all_devices() + if not self.play_music_for_duration(play_duration_seconds): + return False + self.ad.droid.mediaPlayStopAll() time.sleep(20) bt_duration = time.time() - start_time bluetooth_logs, bluetooth_logs_ascii = \ self.collect_bluetooth_manager_metrics_logs( - [self.android_devices[0]]) + [self.ad]) bluetooth_log = bluetooth_logs[0] bluetooth_log_ascii = bluetooth_logs_ascii[0] self.log.info(bluetooth_log_ascii) @@ -113,10 +111,8 @@ class BtFunhausMetricsTest(BtFunhausBaseTest): a2dp_duration = 0 for i in range(num_play): start_time = time.time() - status, bluetooth_off_list, device_not_connected_list = \ - self.play_music_for_duration(play_duration_seconds) - if not status: - return status + if not self.play_music_for_duration(play_duration_seconds): + return False a2dp_duration += (time.time() - start_time) time.sleep(20) bt_duration += (time.time() - start_time) @@ -161,10 +157,8 @@ class BtFunhausMetricsTest(BtFunhausBaseTest): play_duration_seconds = 30 for i in range(num_play): start_time = time.time() - status, bluetooth_off_list, device_not_connected_list = \ - self.play_music_for_duration(play_duration_seconds) - if not status: - return status + if not self.play_music_for_duration(play_duration_seconds): + return False time.sleep(20) bt_duration = time.time() - start_time bluetooth_logs, bluetooth_logs_ascii = \ diff --git a/acts/tests/google/bt/audio_lab/BtFunhausTest.py b/acts/tests/google/bt/audio_lab/BtFunhausTest.py index 941d2b099c..2e697afa63 100644 --- a/acts/tests/google/bt/audio_lab/BtFunhausTest.py +++ b/acts/tests/google/bt/audio_lab/BtFunhausTest.py @@ -59,46 +59,12 @@ class BtFunhausTest(BtFunhausBaseTest): sleep_interval = 120 #twelve_hours_in_seconds = 43200 - one_hour_in_seconds = 3600 - end_time = time.time() + one_hour_in_seconds - status, bluetooth_off_list, device_not_connected_list = \ - self.monitor_music_play_util_deadline(end_time, sleep_interval) - if not status: - return status + #one_hour_in_seconds = 3600 + one_min_in_sec = 60 + end_time = time.time() + one_min_in_sec + if not self.monitor_music_play_util_deadline(end_time, sleep_interval): + return False self._collect_bluetooth_manager_dumpsys_logs(self.android_devices) - self.stop_playing_music_on_all_devices() + self.ad.droid.mediaPlayStopAll() self.collect_bluetooth_manager_metrics_logs(self.android_devices) - if len(device_not_connected_list) > 0 or len(bluetooth_off_list) > 0: - self.log.info("Devices reported as not connected: {}".format( - device_not_connected_list)) - self.log.info("Devices reported with Bluetooth state off: {}". - format(bluetooth_off_list)) - return False - return True - - @test_tracker_info(uuid='285be86d-f00f-4924-a206-e0a590b87b67') - def test_setup_fail_if_devices_not_connected(self): - """Test for devices connected or not during setup. - - This test is designed to fail if the number of devices having - connection issues at time of setup is greater than 0. This lets - the test runner know of the stability of the testbed. - - Steps: - 1. Check lenght of self.device_fails_to_connect_list - - Expected Result: - No device should be in a disconnected state. - - Returns: - Pass if True - Fail if False - - TAGS: None - Priority: 1 - """ - if len(self.device_fails_to_connect_list) > 0: - self.log.error("Devices failed to reconnect:\n{}".format( - self.device_fails_to_connect_list)) - return False return True diff --git a/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py b/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py new file mode 100644 index 0000000000..11d5dc08f2 --- /dev/null +++ b/acts/tests/google/bt/audio_lab/ThreeButtonDongleTest.py @@ -0,0 +1,199 @@ +#/usr/bin/env python3.4 +# +# Copyright (C) 2017 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +""" +Test script to test various ThreeButtonDongle devices +""" +import time + +from acts.controllers.relay_lib.relay import SynchronizeRelays +from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest +from acts.test_utils.bt.bt_test_utils import clear_bonded_devices + + +class ThreeButtonDongleTest(BluetoothBaseTest): + iterations = 10 + + def __init__(self, controllers): + BluetoothBaseTest.__init__(self, controllers) + self.dut = self.android_devices[0] + self.dongle = self.relay_devices[0] + self.log.info("Target dongle is {}".format(self.dongle.name)) + + def setup_test(self): + super(BluetoothBaseTest, self).setup_test() + self.dongle.setup() + return True + + def teardown_test(self): + super(BluetoothBaseTest, self).teardown_test() + self.dongle.clean_up() + clear_bonded_devices(self.dut) + return True + + def _pair_devices(self): + self.dut.droid.bluetoothStartPairingHelper(False) + self.dongle.enter_pairing_mode() + + self.dut.droid.bluetoothBond(self.dongle.mac_address) + + end_time = time.time() + 20 + self.dut.log.info("Verifying devices are bonded") + while time.time() < end_time: + bonded_devices = self.dut.droid.bluetoothGetBondedDevices() + + for d in bonded_devices: + if d['address'] == self.dongle.mac_address: + self.dut.log.info("Successfully bonded to device.") + self.log.info("Bonded devices:\n{}".format(bonded_devices)) + return True + self.dut.log.info("Failed to bond devices.") + return False + + @BluetoothBaseTest.bt_test_wrap + def test_pairing(self): + """Test pairing between a three button dongle and an Android device. + + Test the dongle can be paired to Android device. + + Steps: + 1. Find the MAC address of remote controller from relay config file. + 2. Start the device paring process. + 3. Enable remote controller in pairing mode. + 4. Verify the remote is paired. + + Expected Result: + Remote controller is paired. + + Returns: + Pass if True + Fail if False + + TAGS: Bluetooth, bonding, relay + Priority: 3 + """ + if not self._pair_devices(): + return False + return True + + @BluetoothBaseTest.bt_test_wrap + def test_pairing_multiple_iterations(self): + """Test pairing between a three button dongle and an Android device. + + Test the dongle can be paired to Android device. + + Steps: + 1. Find the MAC address of remote controller from relay config file. + 2. Start the device paring process. + 3. Enable remote controller in pairing mode. + 4. Verify the remote is paired. + + Expected Result: + Remote controller is paired. + + Returns: + Pass if True + Fail if False + + TAGS: Bluetooth, bonding, relay + Priority: 3 + """ + for i in range(self.iterations): + self.log.info("Testing iteration {}.".format(i)) + if not self._pair_devices(): + return False + self.log.info("Unbonding devices.") + self.dut.droid.bluetoothUnbond(self.dongle.mac_address) + # Sleep for relax time for the relay + time.sleep(2) + return True + + @BluetoothBaseTest.bt_test_wrap + def test_next_multiple_iterations(self): + """Test pairing for multiple iterations. + + Test the dongle can be paired to Android device. + + Steps: + 1. Pair devices + 2. Press the next button on dongle for pre-definied iterations. + + Expected Result: + Test is successful + + Returns: + Pass if True + Fail if False + + TAGS: Bluetooth, bonding, relay + Priority: 3 + """ + if not self._pair_devices(): + return False + for _ in range(self.iterations): + self.dongle.press_next() + return True + + @BluetoothBaseTest.bt_test_wrap + def test_play_pause_multiple_iterations(self): + """Test play/pause button on a three button dongle. + + Test the dongle can be paired to Android device. + + Steps: + 1. Pair devices + 2. Press the next button on dongle for pre-definied iterations. + + Expected Result: + Test is successful + + Returns: + Pass if True + Fail if False + + TAGS: Bluetooth, bonding, relay + Priority: 3 + """ + if not self._pair_devices(): + return False + for _ in range(self.iterations): + self.dongle.press_play_pause() + return True + + @BluetoothBaseTest.bt_test_wrap + def test_previous_mulitple_iterations(self): + """Test previous button on a three button dongle. + + Test the dongle can be paired to Android device. + + Steps: + 1. Pair devices + 2. Press the next button on dongle for pre-definied iterations. + + Expected Result: + Test is successful + + Returns: + Pass if True + Fail if False + + TAGS: Bluetooth, bonding, relay + Priority: 3 + """ + if not self._pair_devices(): + return False + for _ in range(100): + self.dongle.press_previous() + return True diff --git a/acts/tests/google/bt/ota/BtOtaTest.py b/acts/tests/google/bt/ota/BtOtaTest.py new file mode 100644 index 0000000000..91e51bb7f7 --- /dev/null +++ b/acts/tests/google/bt/ota/BtOtaTest.py @@ -0,0 +1,137 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +""" +Test script for Bluetooth OTA testing. +""" + +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest +from acts.test_utils.bt.bt_test_utils import pair_pri_to_sec +from acts import signals + + +class BtOtaTest(BluetoothBaseTest): + def setup_class(self): + super(BtOtaTest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + self.dut = self.android_devices[0] + self.pre_ota_name = self.dut.droid.bluetoothGetLocalName() + self.pre_ota_address = self.dut.droid.bluetoothGetLocalAddress() + self.sec_address = self.android_devices[ + 1].droid.bluetoothGetLocalAddress() + + # Pairing devices + if not pair_pri_to_sec(self.dut, self.android_devices[1]): + raise signals.TestSkipClass( + "Failed to bond devices prior to update") + + #Run OTA below, if ota fails then abort all tests + try: + ota_updater.update(self.dut) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + @BluetoothBaseTest.bt_test_wrap + @test_tracker_info(uuid='57545ef0-2c2e-463c-9dbf-28da73cc76df') + def test_device_name_persists(self): + """Test device name persists after OTA update + + Test device name persists after OTA update + + Steps: + 1. Verify pre OTA device name matches post OTA device name + + Expected Result: + Bluetooth Device name persists + + Returns: + Pass if True + Fail if False + + TAGS: OTA + Priority: 2 + """ + return self.pre_ota_name == self.dut.droid.bluetoothGetLocalName() + + @BluetoothBaseTest.bt_test_wrap + @test_tracker_info(uuid='1fd5e1a5-d930-499c-aebc-c1872ab49568') + def test_device_address_persists(self): + """Test device address persists after OTA update + + Test device address persists after OTA update + + Steps: + 1. Verify pre OTA device address matches post OTA device address + + Expected Result: + Bluetooth Device address persists + + Returns: + Pass if True + Fail if False + + TAGS: OTA + Priority: 2 + """ + return self.pre_ota_address == self.dut.droid.bluetoothGetLocalAddress( + ) + + @BluetoothBaseTest.bt_test_wrap + @test_tracker_info(uuid='2e6704e6-3df0-43fb-8425-41ff841d7473') + def test_bluetooth_state_persists(self): + """Test device Bluetooth state persists after OTA update + + Test device Bluetooth state persists after OTA update + + Steps: + 1. Verify post OTA Bluetooth state is on + + Expected Result: + Bluetooth Device Bluetooth state is on + + Returns: + Pass if True + Fail if False + + TAGS: OTA + Priority: 2 + """ + return self.dut.droid.bluetoothCheckState() + + @BluetoothBaseTest.bt_test_wrap + @test_tracker_info(uuid='eb1c0a22-4b4e-4984-af17-ace3bcd203de') + def test_bonded_devices_persist(self): + """Test device bonded devices persists after OTA update + + Test device address persists after OTA update + + Steps: + 1. Verify pre OTA device bonded devices matches post OTA device + bonded devices + + Expected Result: + Bluetooth Device bonded devices persists + + Returns: + Pass if True + Fail if False + + TAGS: OTA + Priority: 1 + """ + bonded_devices = self.dut.droid.bluetoothGetBondedDevices() + for b in bonded_devices: + if b['address'] == self.sec_address: + return True + return False diff --git a/acts/tests/google/bt/power/A2dpPowerTest.py b/acts/tests/google/bt/power/A2dpPowerTest.py index f399bc7358..4b2fd81260 100644 --- a/acts/tests/google/bt/power/A2dpPowerTest.py +++ b/acts/tests/google/bt/power/A2dpPowerTest.py @@ -45,6 +45,8 @@ def push_file_to_device(ad, file_path, device_path, config_path): ad: Device for file push file_path: File path for the file to be pushed to the device device_path: File path on the device as destination + config_path: File path to the config file. This is only used when + a relative path is passed via the ACTS config. Returns: True if successful, False if unsuccessful. @@ -168,8 +170,8 @@ class A2dpPowerTest(PowerBaseTest): for d in bonded_devices: if d['address'] == self.a2dp_speaker.mac_address: self.log.info("Successfully bonded to device.") - self.log.info("XB2 Bonded devices:\n{}".format( - bonded_devices)) + self.log.info( + "Bonded devices:\n{}".format(bonded_devices)) return True return False @@ -179,6 +181,12 @@ class A2dpPowerTest(PowerBaseTest): # Factory reset requires a short delay to take effect time.sleep(3) + self.ad.log.info("Making sure BT phone is enabled here during setup") + if not bluetooth_enabled_check(self.ad): + self.log.error("Failed to turn Bluetooth on DUT") + # Give a breathing time of short delay to take effect + time.sleep(3) + # Determine if we have a relay-based device self.a2dp_speaker = None if self.relay_devices[0]: @@ -204,7 +212,8 @@ class A2dpPowerTest(PowerBaseTest): # Add music files to the Android device music_path_dut = "/sdcard/Music/" - self.cd_quality_music_file = self.user_params["cd_quality_music_file"] + self.cd_quality_music_file = self.user_params["cd_quality_music_file"][ + 0] self.log.info( "Push CD quality music file {}".format(self.cd_quality_music_file)) if not push_file_to_device(self.ad, self.cd_quality_music_file, @@ -213,7 +222,7 @@ class A2dpPowerTest(PowerBaseTest): self.log.error("Unable to push file {} to DUT.".format( self.cd_quality_music_file)) - self.hi_res_music_file = self.user_params["hi_res_music_file"] + self.hi_res_music_file = self.user_params["hi_res_music_file"][0] self.log.info( "Push Hi Res quality music file {}".format(self.hi_res_music_file)) if not push_file_to_device(self.ad, self.hi_res_music_file, @@ -278,7 +287,7 @@ class A2dpPowerTest(PowerBaseTest): self.PMC_BASE_CMD, self.music_url, play_time) if bt_off_mute == True: - msg = "%s --es BT_OFF_Mute %d" % (playing_msg, 1) + msg = "%s --es BT_OFF_Mute %d" % (play_msg, 1) else: codec1_msg = "%s --es CodecType %d --es SampleRate %d" % ( play_msg, codec_type, sample_rate) diff --git a/acts/tests/google/bt/power/SetupBTPairingTest.py b/acts/tests/google/bt/power/SetupBTPairingTest.py new file mode 100644 index 0000000000..4478fd4a5a --- /dev/null +++ b/acts/tests/google/bt/power/SetupBTPairingTest.py @@ -0,0 +1,87 @@ +#!/usr/bin/env python3.4 +# +# Copyright (C) 2017 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +""" +This test script leverages the relay_lib to pair different BT devices. This +script will be invoked from Tradefed test. The test will first setup pairing +between BT device and DUT and wait for signal (through socket) from tradefed +to power down the BT device +""" + +import logging +import socket +import sys +import time + +from acts import base_test + +class SetupBTPairingTest(base_test.BaseTestClass): + + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + + def setup_test(self): + self.bt_device = self.relay_devices[0] + + def wait_for_test_completion(self): + port = int(self.user_params["socket_port"]) + timeout = float(self.user_params["socket_timeout_secs"]) + + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + server_address = ('localhost', port) + logging.info("Starting server socket on localhost port %s", port) + sock.bind(('localhost', port)) + sock.settimeout(timeout) + sock.listen(1) + logging.info("Waiting for client socket connection") + try: + connection, client_address = sock.accept() + except socket.timeout: + logging.error("Did not receive signal. Shutting down AP") + except socket.error: + logging.error("Socket connection errored out. Shutting down AP") + finally: + if connection is not None: + connection.close() + if sock is not None: + sock.shutdown(socket.SHUT_RDWR) + sock.close() + + + def enable_pairing_mode(self): + self.bt_device.setup() + self.bt_device.power_on() + # Wait for a moment between pushing buttons + time.sleep(0.25) + self.bt_device.enter_pairing_mode() + + def test_bt_pairing(self): + req_params = [ + "RelayDevice", "socket_port", "socket_timeout_secs" + ] + opt_params = [] + self.unpack_userparams( + req_param_names=req_params, opt_param_names=opt_params) + # Setup BT pairing mode + self.enable_pairing_mode() + # BT pairing mode is turned on + self.wait_for_test_completion() + + def teardown_test(self): + self.bt_device.power_off() + self.bt_device.clean_up() diff --git a/acts/tests/google/bt/pts/BtCmdLineTest.py b/acts/tests/google/bt/pts/BtCmdLineTest.py index cb97233b8a..5091db31d5 100644 --- a/acts/tests/google/bt/pts/BtCmdLineTest.py +++ b/acts/tests/google/bt/pts/BtCmdLineTest.py @@ -24,9 +24,12 @@ Optional config parameters: from acts.test_utils.bt.BluetoothBaseTest import BluetoothBaseTest from cmd_input import CmdInput from queue import Empty + import os import uuid +from acts.test_utils.tel.tel_test_utils import setup_droid_properties + class BtCmdLineTest(BluetoothBaseTest): target_mac_address = "" @@ -46,7 +49,7 @@ class BtCmdLineTest(BluetoothBaseTest): self.log.error( "Missing mandatory user config \"sim_conf_file\"!") return False - sim_conf_file = self.user_params["sim_conf_file"] + sim_conf_file = self.user_params["sim_conf_file"][0] # If the sim_conf_file is not a full path, attempt to find it # relative to the config file. if not os.path.isfile(sim_conf_file): @@ -61,7 +64,7 @@ class BtCmdLineTest(BluetoothBaseTest): music_path_str = "music_path" android_music_path = "/sdcard/Music/" if music_path_str not in self.user_params: - log.error("Need music for A2DP testcases") + self.log.error("Need music for A2DP testcases") return False music_path = self.user_params[music_path_str] self._add_music_to_primary_android_device(music_path, diff --git a/acts/tests/google/bt/pts/gatt_test_database.py b/acts/tests/google/bt/pts/gatt_test_database.py index 360fbd7ff2..2714947607 100644 --- a/acts/tests/google/bt/pts/gatt_test_database.py +++ b/acts/tests/google/bt/pts/gatt_test_database.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations under # the License. -from acts.test_utils.bt.GattEnum import CharacteristicValueFormat from acts.test_utils.bt.bt_constants import gatt_characteristic from acts.test_utils.bt.bt_constants import gatt_descriptor from acts.test_utils.bt.bt_constants import gatt_service_types @@ -71,7 +70,7 @@ INVALID_SMALL_DATABASE = { 'uuid': '00001801-0000-1000-8000-00805f9b34fb', 'type': gatt_service_types['primary'], 'characteristics': [{ - 'uuid': GattCharTypes.GATT_CHARAC_SERVICE_CHANGED.value, + 'uuid': gatt_char_types['service_changed'], 'properties': gatt_characteristic['property_indicate'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -79,7 +78,7 @@ INVALID_SMALL_DATABASE = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x0000], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value, + 'uuid': gatt_char_desc_uuids['client_char_cfg'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], }] @@ -234,11 +233,10 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x04], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_SERVER_CHARAC_CFG_UUID.value, + 'uuid': gatt_char_desc_uuids['server_char_cfg'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], - 'value': - GattDescriptor.DISABLE_NOTIFICATION_VALUE.value + 'value': gatt_descriptor['disable_notification_value'] }] }, { @@ -295,11 +293,11 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x05], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x03, 0x00] }, { - 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [ @@ -308,9 +306,9 @@ LARGE_DB_1 = { 0x83, 0x84, 0x85, 0x86, 0x87, 0x88, 0x89, 0x90 ] }, { - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': - GattDescriptor.PERMISSION_READ_ENCRYPTED_MITM.value, + gatt_descriptor['permission_read_encrypted_mitm'], 'value': [0x00, 0x01, 0x30, 0x01, 0x11, 0x31] }, { 'uuid': '0000d5d4-0000-0000-0123-456789abcdef', @@ -333,7 +331,7 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x09], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_read'], 'value': gatt_descriptor['enable_notification_value'] }, { @@ -360,7 +358,7 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['string'], 'value': "Length is ", 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x19, 0x00, 0x00, 0x30, 0x01, 0x00, 0x00] }] @@ -374,7 +372,7 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x65], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x04, 0x00, 0x01, 0x27, 0x01, 0x01, 0x00] }] @@ -388,7 +386,7 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x34, 0x12], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x06, 0x00, 0x10, 0x27, 0x01, 0x02, 0x00] }] @@ -402,7 +400,7 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x04, 0x03, 0x02, 0x01], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x08, 0x00, 0x17, 0x27, 0x01, 0x03, 0x00] }] @@ -414,14 +412,14 @@ LARGE_DB_1 = { 'value_type': gatt_characteristic_value_format['byte'], 'value': [0x65, 0x34, 0x12, 0x04, 0x03, 0x02, 0x01], 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_AGREG_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_agreg_fmt'], 'permissions': gatt_descriptor['permission_read'], 'value': [0xa6, 0x00, 0xa9, 0x00, 0xac, 0x00] }] }, { 'uuid': '0000b011-0000-1000-8000-00805f9b34fb', - 'properties': GattCharacteristic.WRITE_TYPE_SIGNED.value + 'properties': gatt_characteristic['write_type_signed'] | #for some reason 0x40 is not working... gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | @@ -720,7 +718,7 @@ LARGE_DB_2 = { 'permissions': gatt_descriptor['permission_write'], 'value': [0x33] }, { - 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_write'], 'value': gatt_descriptor['enable_notification_value'] }] @@ -1085,62 +1083,56 @@ LARGE_DB_3 = { 'value': [0x04], 'descriptors': [ { - 'uuid': - GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x09] }, { - 'uuid': - GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { - 'uuid': - GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value, + 'uuid': gatt_char_desc_uuids['client_char_cfg'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x01, 0x00] }, { - 'uuid': - GattCharDesc.GATT_SERVER_CHARAC_CFG_UUID.value, + 'uuid': gatt_char_desc_uuids['server_char_cfg'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { - 'uuid': - GattCharDesc.GATT_CHARAC_AGREG_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_agreg_fmt'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { - 'uuid': - GattCharDesc.GATT_CHARAC_VALID_RANGE_UUID.value, + 'uuid': gatt_char_desc_uuids['char_valid_range'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { 'uuid': - GattCharDesc.GATT_EXTERNAL_REPORT_REFERENCE.value, + gatt_char_desc_uuids['external_report_reference'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] }, { - 'uuid': GattCharDesc.GATT_REPORT_REFERENCE.value, + 'uuid': gatt_char_desc_uuids['report_reference'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0x22] @@ -1148,7 +1140,7 @@ LARGE_DB_3 = { ] }, { - 'uuid': GattCharTypes.GATT_CHARAC_SERVICE_CHANGED.value, + 'uuid': gatt_char_types['service_changed'], 'instance_id': 0x0023, 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | @@ -1165,8 +1157,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_PERIPHERAL_PRIV_FLAG.value, + 'uuid': gatt_char_types['peripheral_priv_flag'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1174,8 +1165,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_RECONNECTION_ADDRESS.value, + 'uuid': gatt_char_types['reconnection_address'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1183,7 +1173,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': GattCharTypes.GATT_CHARAC_SYSTEM_ID.value, + 'uuid': gatt_char_types['system_id'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1191,8 +1181,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_MODEL_NUMBER_STRING.value, + 'uuid': gatt_char_types['model_number_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1200,8 +1189,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_SERIAL_NUMBER_STRING.value, + 'uuid': gatt_char_types['serial_number_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1209,8 +1197,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_FIRMWARE_REVISION_STRING.value, + 'uuid': gatt_char_types['firmware_revision_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1218,8 +1205,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_HARDWARE_REVISION_STRING.value, + 'uuid': gatt_char_types['hardware_revision_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1227,8 +1213,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_SOFTWARE_REVISION_STRING.value, + 'uuid': gatt_char_types['software_revision_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1236,8 +1221,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': - GattCharTypes.GATT_CHARAC_MANUFACTURER_NAME_STRING.value, + 'uuid': gatt_char_types['manufacturer_name_string'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1245,7 +1229,7 @@ LARGE_DB_3 = { 'value': '333334444455555666667777788888999990000011111', }, { - 'uuid': GattCharTypes.GATT_CHARAC_PNP_ID.value, + 'uuid': gatt_char_types['pnp_id'], 'properties': gatt_characteristic['property_read'], 'permissions': gatt_characteristic['permission_read'] | gatt_characteristic['permission_write'], @@ -1365,7 +1349,7 @@ LARGE_DB_3 = { 'value': [0x22] }, { - 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x01, 0x00] }, @@ -1390,13 +1374,13 @@ LARGE_DB_3 = { 'value': [0x05], 'descriptors': [ { - 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': gatt_descriptor['permission_read'] | gatt_descriptor['permission_write'], 'value': [0] * 26 }, { - 'uuid': GattCharDesc.GATT_CHARAC_EXT_PROPER_UUID.value, + 'uuid': gatt_char_desc_uuids['char_ext_props'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x03, 0x00] }, @@ -1406,7 +1390,7 @@ LARGE_DB_3 = { 'value': [0x44] }, { - 'uuid': GattCharDesc.GATT_CHARAC_FMT_UUID.value, + 'uuid': gatt_char_desc_uuids['char_fmt_uuid'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x04, 0x00, 0x01, 0x30, 0x01, 0x11, 0x31] }, @@ -1511,7 +1495,7 @@ TEST_DB_1 = { 'value': 'test', 'instance_id': 0x002a, 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x01] }] @@ -1559,7 +1543,7 @@ TEST_DB_3 = { 'value': 'test', 'instance_id': 0x002a, 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': gatt_descriptor['permission_read'], 'value': [0x01] }, { @@ -1595,9 +1579,9 @@ TEST_DB_4 = { 'value': "test", 'instance_id': 0x002a, 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CHARAC_USER_DESC_UUID.value, + 'uuid': gatt_char_desc_uuids['char_user_desc'], 'permissions': - GattDescriptor.PERMISSION_READ_ENCRYPTED_MITM.value, + gatt_descriptor['permission_read_encrypted_mitm'], 'value': [0] * 512 }] }] @@ -1660,7 +1644,7 @@ SIMPLE_READ_DESCRIPTOR = { 'value_type': gatt_characteristic_value_format['string'], 'value': 'Test Database', 'descriptors': [{ - 'uuid': GattCharDesc.GATT_CLIENT_CHARAC_CFG_UUID.value, + 'uuid': gatt_char_desc_uuids['client_char_cfg'], 'permissions': gatt_descriptor['permission_read'], }] }] diff --git a/acts/tests/google/bt/pts/gatts_lib.py b/acts/tests/google/bt/pts/gatts_lib.py index 2e3a8f99b8..45a7a8dabe 100644 --- a/acts/tests/google/bt/pts/gatts_lib.py +++ b/acts/tests/google/bt/pts/gatts_lib.py @@ -21,12 +21,13 @@ from acts.keys import Config from acts.utils import rand_ascii_str from acts.test_utils.bt.bt_constants import gatt_cb_strings from acts.test_utils.bt.bt_constants import gatt_characteristic +from acts.test_utils.bt.bt_constants import gatt_characteristic_value_format from acts.test_utils.bt.bt_constants import gatt_cb_err from acts.test_utils.bt.bt_constants import gatt_transport from acts.test_utils.bt.bt_constants import gatt_event from acts.test_utils.bt.bt_constants import gatt_server_responses from acts.test_utils.bt.bt_constants import gatt_service_types -from acts.test_utils.bt.bt_test_utils import TIMEOUT_SMALL +from acts.test_utils.bt.bt_constants import small_timeout from gatt_test_database import STRING_512BYTES from acts.utils import exe_cmd @@ -119,7 +120,7 @@ class GattServerLib(): self.gatt_server_callback) regex = "({}|{}|{}|{}|{})".format(desc_read, desc_write, char_read, char_write, execute_write) - events = self.dut.ed.pop_events(regex, 5, TIMEOUT_SMALL) + events = self.dut.ed.pop_events(regex, 5, small_timeout) status = 0 if user_input: status = gatt_server_responses.get(user_input) @@ -282,7 +283,7 @@ class GattServerLib(): i = 0 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) while time.time() < end_time: - events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL) + events = self.dut.ed.pop_events(regex, 10, small_timeout) for event in events: start_offset = i * (mtu - 1) i += 1 @@ -321,7 +322,7 @@ class GattServerLib(): i = 0 num_packets = ceil((len(char_value) + 1) / (mtu - 1)) while time.time() < end_time: - events = self.dut.ed.pop_events(regex, 10, TIMEOUT_SMALL) + events = self.dut.ed.pop_events(regex, 10, small_timeout) for event in events: self.log.info(event) request_id = event['data']['requestId'] diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py index 73bce59b80..d0d393f404 100644 --- a/acts/tests/google/net/CoreNetworkingTest.py +++ b/acts/tests/google/net/CoreNetworkingTest.py @@ -58,12 +58,14 @@ class CoreNetworkingTest(base_test.BaseTestClass): self.dut.adb.shell("cmd netpolicy set restrict-background true") # Launch app, check internet connectivity and close app + self.log.info("Launch app and test internet connectivity") res = self.dut.droid.launchForResult(dum_class) - self.log.info("Internet connectivity status after app launch: %s " - % res['extras']['result']) # Disable data saver mode self.log.info("Disable data saver mode") self.dut.adb.shell("cmd netpolicy set restrict-background false") + # Return test result + self.log.info("Internet connectivity status after app launch: %s " + % res['extras']['result']) return res['extras']['result'] diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py index 7f769919b5..786a40a822 100644 --- a/acts/tests/google/net/LegacyVpnTest.py +++ b/acts/tests/google/net/LegacyVpnTest.py @@ -122,10 +122,14 @@ class LegacyVpnTest(base_test.BaseTestClass): Args: connected_vpn_info which specifies the VPN connection status """ + ping_result = None pkt_loss = "100% packet loss" - ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s" - % self.vpn_verify_address) - return pkt_loss not in ping_result + try: + ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s" + % self.vpn_verify_address) + except adb.AdbError: + pass + return ping_result and pkt_loss not in ping_result def legacy_vpn_connection_test_logic(self, vpn_profile): """ Test logic for each legacy VPN connection @@ -180,7 +184,7 @@ class LegacyVpnTest(base_test.BaseTestClass): self.legacy_vpn_connection_test_logic(vpn_profile) @test_tracker_info(uuid="99af78dd-40b8-483a-8344-cd8f67594971") - def test_legacy_vpn_l2tp_ipsec_psk_libreswan(self): + def legacy_vpn_l2tp_ipsec_psk_libreswan(self): """ Verify L2TP IPSec PSK VPN connection to libreSwan server """ @@ -191,7 +195,7 @@ class LegacyVpnTest(base_test.BaseTestClass): self.legacy_vpn_connection_test_logic(vpn_profile) @test_tracker_info(uuid="e67d8c38-92c3-4167-8b6c-a49ef939adce") - def test_legacy_vpn_l2tp_ipsec_rsa_libreswan(self): + def legacy_vpn_l2tp_ipsec_rsa_libreswan(self): """ Verify L2TP IPSec RSA VPN connection to libreSwan server """ @@ -202,7 +206,7 @@ class LegacyVpnTest(base_test.BaseTestClass): self.legacy_vpn_connection_test_logic(vpn_profile) @test_tracker_info(uuid="8b3517dc-6a3b-44c2-a85d-bd7b969df3cf") - def test_legacy_vpn_ipsec_xauth_psk_libreswan(self): + def legacy_vpn_ipsec_xauth_psk_libreswan(self): """ Verify IPSec XAUTH PSK VPN connection to libreSwan server """ @@ -213,7 +217,7 @@ class LegacyVpnTest(base_test.BaseTestClass): self.legacy_vpn_connection_test_logic(vpn_profile) @test_tracker_info(uuid="abac663d-1d91-4b87-8e94-11c6e44fb07b") - def test_legacy_vpn_ipsec_xauth_rsa_libreswan(self): + def legacy_vpn_ipsec_xauth_rsa_libreswan(self): """ Verify IPSec XAUTH RSA VPN connection to libreSwan server """ diff --git a/acts/tests/google/nfc/NfcBasicFunctionalityTest.py b/acts/tests/google/nfc/NfcBasicFunctionalityTest.py new file mode 100644 index 0000000000..8715fdf62f --- /dev/null +++ b/acts/tests/google/nfc/NfcBasicFunctionalityTest.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +from acts.base_test import BaseTestClass +from acts.test_decorators import test_tracker_info + + +class NfcBasicFunctionalityTest(BaseTestClass): + nfc_on_event = "NfcStateOn" + nfc_off_event = "NfcStateOff" + timeout = 1 + + def setup_class(self): + self.dut = self.android_devices[0] + self.dut.droid.nfcStartTrackingStateChange() + return True + + def _ensure_nfc_enabled(self, dut): + end_time = time.time() + 10 + while (not dut.droid.nfcIsEnabled() and end_time > time.time()): + try: + dut.ed.pop_event(nfc_on_event, self.timeout) + except Exception as err: + self.log.debug("Event not yet found") + return dut.droid.nfcIsEnabled() + + def _ensure_nfc_disabled(self, dut): + end_time = time.time() + 10 + while (dut.droid.nfcIsEnabled() and end_time > time.time()): + try: + dut.ed.pop_event(nfc_off_event, self.timeout) + except Exception as err: + self.log.debug("Event not yet found") + return not dut.droid.nfcIsEnabled() + + def setup_test(self): + # Every test starts with the assumption that NFC is enabled + if not self.dut.droid.nfcIsEnabled(): + self.dut.droid.nfcEnable() + else: + return True + if not self._ensure_nfc_enabled(self.dut): + self.log.error("Failed to toggle NFC on") + return False + return True + + @test_tracker_info(uuid='d57fcdd8-c56c-4ab0-81fb-e2218b100de9') + def test_nfc_toggle_state_100_iterations(self): + """Test toggling NFC state 100 times. + + Verify that NFC toggling works. Test assums NFC is on. + + Steps: + 1. Toggle NFC off + 2. Toggle NFC on + 3. Repeat steps 1-2 100 times. + + Expected Result: + RFCOMM connection is established then disconnected succcessfully. + + Returns: + Pass if True + Fail if False + + TAGS: NFC + Priority: 1 + """ + iterations = 100 + for i in range(iterations): + self.log.info("Starting iteration {}".format(i + 1)) + self.dut.droid.nfcDisable() + if not self._ensure_nfc_disabled(self.dut): + return False + self.dut.droid.nfcEnable() + if not self._ensure_nfc_enabled(self.dut): + return False + return True diff --git a/acts/tests/google/power/PowerbaselineTest.py b/acts/tests/google/power/PowerbaselineTest.py new file mode 100644 index 0000000000..8ff743793d --- /dev/null +++ b/acts/tests/google/power/PowerbaselineTest.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +from acts import base_test +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils +from acts.test_decorators import test_tracker_info + + +class PowerbaselineTest(base_test.BaseTestClass): + """Power baseline tests for rockbottom state. + Rockbottom for wifi on/off, screen on/off, everything else turned off + + """ + + def __init__(self, controllers): + + base_test.BaseTestClass.__init__(self, controllers) + self.tests = ('test_rockbottom_screenoff_wifidisabled', + 'test_rockbottom_screenoff_wifidisconnected', + 'test_rockbottom_screenon_wifidisabled', + 'test_rockbottom_screenon_wifidisconnected') + + def setup_class(self): + + self.dut = self.android_devices[0] + req_params = ['baselinetest_params'] + self.unpack_userparams(req_params) + self.unpack_testparams(self.baselinetest_params) + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + + def teardown_class(self): + + self.mon.usb('on') + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def rockbottom_test_func(self, screen_status, wifi_status): + """Test function for baseline rockbottom tests. + + Args: + screen_status: screen on or off + wifi_status: wifi enable or disable, on/off, not connected even on + """ + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + if wifi_status == 'ON': + wutils.wifi_toggle_state(self.dut, True) + if screen_status == 'OFF': + self.dut.droid.goToSleepNow() + self.dut.log.info('Screen is OFF') + # Collecting current measurement data and plot + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + wputils.pass_fail_check(self, avg_current) + + # Test cases + @test_tracker_info(uuid='e7ab71f4-1e14-40d2-baec-cde19a3ac859') + def test_rockbottom_screenoff_wifidisabled(self): + + self.rockbottom_test_func('OFF', 'OFF') + + @test_tracker_info(uuid='167c847d-448f-4c7c-900f-82c552d7d9bb') + def test_rockbottom_screenoff_wifidisconnected(self): + + self.rockbottom_test_func('OFF', 'ON') + + @test_tracker_info(uuid='2cd25820-8548-4e60-b0e3-63727b3c952c') + def test_rockbottom_screenon_wifidisabled(self): + + self.rockbottom_test_func('ON', 'OFF') + + @test_tracker_info(uuid='d7d90a1b-231a-47c7-8181-23814c8ff9b6') + def test_rockbottom_screenon_wifidisconnected(self): + + self.rockbottom_test_func('ON', 'ON') diff --git a/acts/tests/google/power/PowerdtimTest.py b/acts/tests/google/power/PowerdtimTest.py new file mode 100644 index 0000000000..24038747d2 --- /dev/null +++ b/acts/tests/google/power/PowerdtimTest.py @@ -0,0 +1,177 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from acts import base_test +from acts.controllers.ap_lib import hostapd_constants as hc +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils + + +class PowerdtimTest(base_test.BaseTestClass): + def __init__(self, controllers): + + base_test.BaseTestClass.__init__(self, controllers) + self.tests = ('test_2g_screenoff_dtimx1', 'test_2g_screenoff_dtimx2', + 'test_2g_screenoff_dtimx4', 'test_2g_screenoff_dtimx9', + 'test_2g_screenon_dtimx1', 'test_2g_screenon_dtimx4', + 'test_5g_screenoff_dtimx1', 'test_5g_screenoff_dtimx2', + 'test_5g_screenoff_dtimx4', 'test_5g_screenoff_dtimx9', + 'test_5g_screenon_dtimx1', 'test_5g_screenon_dtimx4') + + def setup_class(self): + + self.log = logging.getLogger() + self.dut = self.android_devices[0] + self.access_point = self.access_points[0] + req_params = ['main_network', 'aux_network', 'dtimtest_params'] + self.unpack_userparams(req_params) + self.unpack_testparams(self.dtimtest_params) + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + self.num_atten = self.attenuators[0].instrument.num_atten + + def teardown_class(self): + + self.mon.usb('on') + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def dtim_test_func(self, dtim, screen_status, network, dtim_max=6): + """A reusable function for DTIM test. + Covering different DTIM value, with screen ON or OFF and 2g/5g network + + Args: + dtim: the value for DTIM set on the phone + screen_status: screen on or off + network: a dict of information for the network to connect + """ + # Initialize the dut to rock-bottom state + wputils.change_dtim( + self.dut, gEnableModulatedDTIM=dtim, gMaxLIModulatedDTIM=dtim_max) + self.dut.log.info('DTIM value of the phone is now {}'.format(dtim)) + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + [ + self.attenuators[i].set_atten(self.atten_level['zero_atten'][i]) + for i in range(self.num_atten) + ] + self.log.info('Set attenuation level to connect the main AP') + wputils.ap_setup(self.access_point, network) + wutils.wifi_connect(self.dut, network) + if screen_status == 'OFF': + self.dut.droid.goToSleepNow() + self.dut.log.info('Screen is OFF') + time.sleep(5) + # Collect power data and plot + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.access_point.close() + # Pass and fail check + wputils.pass_fail_check(self, avg_current) + + # Test cases + @test_tracker_info(uuid='2a70a78b-93a8-46a6-a829-e1624b8239d2') + def test_2g_screenoff_dtimx1(self): + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(1, 'OFF', network) + + @test_tracker_info(uuid='b6c4114d-984a-4269-9e77-2bec0e4b6e6f') + def test_2g_screenoff_dtimx2(self): + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(2, 'OFF', network) + + @test_tracker_info(uuid='2ae5bc29-3d5f-4fbb-9ff6-f5bd499a9d6e') + def test_2g_screenoff_dtimx4(self): + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(4, 'OFF', network) + + @test_tracker_info(uuid='b37fa75f-6166-4247-b15c-adcda8c7038e') + def test_2g_screenoff_dtimx9(self): + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(9, 'OFF', network, dtim_max=10) + + @test_tracker_info(uuid='384d3b0f-4335-4b00-8363-308ec27a150c') + def test_2g_screenon_dtimx1(self): + """With screen on, modulated dtim isn't wokring, always DTIMx1. + So not running through all DTIM cases + + """ + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(1, 'ON', network) + + @test_tracker_info(uuid='79d0f065-2c46-4400-b02c-5ad60e79afea') + def test_2g_screenon_dtimx4(self): + """Run only extra DTIMx4 for screen on to compare with DTIMx1. + They should be the same if everything is correct. + + """ + network = self.main_network[hc.BAND_2G] + self.dtim_test_func(4, 'ON', network) + + @test_tracker_info(uuid='5e2f73cb-7e4e-4a25-8fd5-c85adfdf466e') + def test_5g_screenoff_dtimx1(self): + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(1, 'OFF', network) + + @test_tracker_info(uuid='017f57c3-e133-461d-80be-d025d1491d8a') + def test_5g_screenoff_dtimx2(self): + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(2, 'OFF', network) + + @test_tracker_info(uuid='b84a1cb3-9573-4bfd-9875-0f33cb171cc5') + def test_5g_screenoff_dtimx4(self): + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(4, 'OFF', network) + + @test_tracker_info(uuid='75644df4-2cc8-4bbd-8985-0656a4f9d056') + def test_5g_screenoff_dtimx9(self): + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(9, 'OFF', network, dtim_max=10) + + @test_tracker_info(uuid='327af44d-d9e7-49e0-9bda-accad6241dc7') + def test_5g_screenon_dtimx1(self): + """With screen on, modulated dtim isn't wokring, always DTIMx1. + So not running through all DTIM cases + + """ + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(1, 'ON', network) + + @test_tracker_info(uuid='8b32585f-2517-426b-a2c9-8087093cf991') + def test_5g_screenon_dtimx4(self): + """Run only extra DTIMx4 for screen on to compare with DTIMx1. + They should be the same if everything is correct. + + """ + network = self.main_network[hc.BAND_5G] + self.dtim_test_func(4, 'ON', network) diff --git a/acts/tests/google/power/PowermulticastTest.py b/acts/tests/google/power/PowermulticastTest.py new file mode 100644 index 0000000000..a2cfd098e6 --- /dev/null +++ b/acts/tests/google/power/PowermulticastTest.py @@ -0,0 +1,450 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time + +from acts import base_test +from acts.controllers.ap_lib import bridge_interface as bi +from acts.controllers.ap_lib import hostapd_constants as hc +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils +from acts.controllers import packet_sender as pkt_utils + +RA_SHORT_LIFETIME = 3 +RA_LONG_LIFETIME = 1000 +DNS_LONG_LIFETIME = 300 +DNS_SHORT_LIFETIME = 3 + + +class PowermulticastTest(base_test.BaseTestClass): + def __init__(self, controllers): + + base_test.BaseTestClass.__init__(self, controllers) + self.tests = ( + 'test_screenoff_directed_arp', 'test_screenoff_misdirected_arp', + 'test_screenoff_directed_ns', 'test_screenoff_misdirected_ns', + 'test_screenoff_ra_short', 'test_screenoff_ra_long', + 'test_screenoff_directed_dhcp_offer', + 'test_screenoff_misdirected_dhcp_offer', + 'test_screenoff_ra_rnds_short', 'test_screenoff_ra_rnds_long', + 'test_screenoff_directed_ping6', + 'test_screenoff_misdirected_ping6', + 'test_screenoff_directed_ping4', + 'test_screenoff_misdirected_ping4', 'test_screenoff_mdns6', + 'test_screenoff_mdns4', 'test_screenon_directed_arp', + 'test_screenon_misdirected_arp', 'test_screenon_directed_ns', + 'test_screenon_misdirected_ns', 'test_screenon_ra_short', + 'test_screenon_ra_long', 'test_screenon_directed_dhcp_offer', + 'test_screenon_misdirected_dhcp_offer', + 'test_screenon_ra_rnds_short', 'test_screenon_ra_rnds_long', + 'test_screenon_directed_ping6', 'test_screenon_misdirected_ping6', + 'test_screenon_directed_ping4', 'test_screenon_misdirected_ping4', + 'test_screenon_mdns6', 'test_screenon_mdns4') + + def setup_class(self): + + self.log = logging.getLogger() + self.dut = self.android_devices[0] + self.access_point = self.access_points[0] + req_params = ['main_network', 'multicast_params'] + self.unpack_userparams(req_params) + self.unpack_testparams(self.multicast_params) + self.num_atten = self.attenuators[0].instrument.num_atten + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + self.pkt_sender = self.packet_senders[0] + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def teardown_class(self): + """Clean up the test class after tests finish running + + """ + self.mon.usb('on') + self.access_point.close() + + def set_connection(self, screen_status, network): + """Setup connection between AP and client. + + Setup connection between AP and phone, change DTIMx1 and get information + such as IP addresses to prepare packet construction. + + Args: + screen_status: screen on or off + network: network selection, 2g/5g + """ + # Change DTIMx1 on the phone to receive all Multicast packets + wputils.change_dtim( + self.dut, gEnableModulatedDTIM=1, gMaxLIModulatedDTIM=10) + self.dut.log.info('DTIM value of the phone is now DTIMx1') + + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + + # Set attenuation and connect to AP + for attn in range(self.num_atten): + self.attenuators[attn].set_atten( + self.atten_level['zero_atten'][attn]) + self.log.info('Set attenuation level to all zero') + channel = network['channel'] + iface_eth = self.pkt_sender.interface + brconfigs = self.access_point.generate_bridge_configs(channel) + self.brconfigs = bi.BridgeInterfaceConfigs(brconfigs[0], brconfigs[1], + brconfigs[2]) + self.access_point.bridge.startup(self.brconfigs) + wputils.ap_setup(self.access_point, network) + wutils.wifi_connect(self.dut, network) + + # Wait for DHCP with timeout of 60 seconds + wputils.wait_for_dhcp(iface_eth) + + # Set the desired screen status + if screen_status == 'OFF': + self.dut.droid.goToSleepNow() + self.dut.log.info('Screen is OFF') + time.sleep(5) + + def sendPacketAndMeasure(self, packet): + """Packet injection template function + + Args: + packet: packet to be sent/inject + """ + # Start sending packets + self.pkt_sender.start_sending(packet, self.interval) + + # Measure current and plot + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + + # Bring down the bridge interface + self.access_point.bridge.teardown(self.brconfigs) + + # Close AP + self.access_point.close() + + # Compute pass or fail check + wputils.pass_fail_check(self, avg_current) + + # Test cases - screen OFF + @test_tracker_info(uuid='b5378aaf-7949-48ac-95fb-ee94c85d49c3') + def test_screenoff_directed_arp(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='3b5d348d-70bf-483d-8736-13da569473aa') + def test_screenoff_misdirected_arp(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='8e534d3b-5a25-429a-a1bb-8119d7d28b5a') + def test_screenoff_directed_ns(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='536d716d-f30b-4d20-9976-e2cbc36c3415') + def test_screenoff_misdirected_ns(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv6_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='5eed3174-8e94-428e-8527-19a9b5a90322') + def test_screenoff_ra_short(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(RA_SHORT_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='67867bae-f1c5-44a4-9bd0-2b832ac8059c') + def test_screenoff_ra_long(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(RA_LONG_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='db19bc94-3513-45c4-b3a5-d6219649d0bb') + def test_screenoff_directed_dhcp_offer(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='a8059869-40ee-4cf3-a957-4b7aed03fcf9') + def test_screenoff_misdirected_dhcp_offer(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='6e663f0a-3eb5-46f6-a79e-311baebd5d2a') + def test_screenoff_ra_rnds_short(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate( + RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='84d2f1ff-bd4f-46c6-9b06-826d9b14909c') + def test_screenoff_ra_rnds_long(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate( + RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='4a17e74f-3e7f-4e90-ac9e-884a7c13cede') + def test_screenoff_directed_ping6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='ab249e0d-58ba-4b55-8a81-e1e4fb04780a') + def test_screenoff_misdirected_ping6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv6_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='e37112e6-5c35-4c89-8d15-f5a44e69be0b') + def test_screenoff_directed_ping4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='afd4a011-63a9-46c3-8a75-13f515ba8475') + def test_screenoff_misdirected_ping4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='03f0e845-fd66-4120-a79d-5eb64d49b6cd') + def test_screenoff_mdns6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='dcbb0aec-512d-48bd-b743-024697ce511b') + def test_screenoff_mdns4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('OFF', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + # Test cases: screen ON + @test_tracker_info(uuid='b9550149-bf36-4f86-9b4b-6e900756a90e') + def test_screenon_directed_arp(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='406dffae-104e-46cb-9ec2-910aac7aca39') + def test_screenon_misdirecteded_arp(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='be4cb543-c710-4041-a770-819e82a6d164') + def test_screenon_directed_ns(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='de21d24f-e03e-47a1-8bbb-11953200e870') + def test_screenon_misdirecteded_ns(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.NsGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv6_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='b424a170-5095-4b47-82eb-50f7b7fdf35d') + def test_screenon_ra_short(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(RA_SHORT_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='ab627e59-2ee8-4c0d-970b-eeb1d1cecdc1') + def test_screenon_ra_long(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(RA_LONG_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='ee6514ab-1814-44b9-ba01-63f77ba77c34') + def test_screenon_directed_dhcp_offer(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='eaebfe98-32da-4ebc-bca7-3b7026d99a4f') + def test_screenon_misdirecteded_dhcp_offer(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.DhcpOfferGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.mac_dst_fake, self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='f0e2193f-bf6a-441b-b9c1-bb7b65787cd5') + def test_screenon_ra_rnds_short(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate( + RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='62b99cd7-75bf-45be-b93f-bb037a13b3e2') + def test_screenon_ra_rnds_long(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) + packet = pkt_gen.generate( + RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='4088af4c-a64b-4fc1-848c-688936cc6c12') + def test_screenon_directed_ping6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='3179e327-e6ac-4dae-bb8a-f3940f21094d') + def test_screenon_misdirected_ping6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv6_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='90c70e8a-74fd-4878-89c6-5e15c3ede318') + def test_screenon_directed_ping4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='dcfabbc7-a7e1-4a92-a38d-8ebe7aa2e063') + def test_screenon_misdirected_ping4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Ping4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate(self.ipv4_dst_fake) + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='117814db-f94d-4239-a7ab-033482b1da52') + def test_screenon_mdns6(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Mdns6Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) + + @test_tracker_info(uuid='ce6ad7e2-21f3-4e68-9c0d-d0e14e0a7c53') + def test_screenon_mdns4(self): + network = self.main_network[hc.BAND_5G] + self.set_connection('ON', network) + self.pkt_gen_config = wputils.create_pkt_config(self) + pkt_gen = pkt_utils.Mdns4Generator(**self.pkt_gen_config) + packet = pkt_gen.generate() + self.sendPacketAndMeasure(packet) diff --git a/acts/tests/google/power/PowerroamingTest.py b/acts/tests/google/power/PowerroamingTest.py new file mode 100644 index 0000000000..8bab26cd6e --- /dev/null +++ b/acts/tests/google/power/PowerroamingTest.py @@ -0,0 +1,265 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from acts import base_test +from acts.controllers.ap_lib import hostapd_constants as hc +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_constants as wc +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils + + +class PowerroamingTest(base_test.BaseTestClass): + def __init__(self, controllers): + + base_test.BaseTestClass.__init__(self, controllers) + self.tests = ('test_screenoff_roaming', 'test_screenoff_fastroaming', + 'test_screenon_toggle_between_AP', + 'test_screenoff_toggle_between_AP', + 'test_screenoff_wifi_wedge') + + def setup_class(self): + + self.log = logging.getLogger() + self.dut = self.android_devices[0] + self.access_point_main = self.access_points[0] + self.access_point_aux = self.access_points[1] + req_params = ('main_network', 'aux_network', 'roamingtest_params') + self.unpack_userparams(req_params) + self.unpack_testparams(self.roamingtest_params) + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon_duration_all = self.mon_duration + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + self.num_atten = self.attenuators[0].instrument.num_atten + + def teardown_class(self): + + self.mon.usb('on') + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def ap_close_all(self): + """Close all the AP controller objects in roaming tests. + + """ + for ap in self.access_points: + ap.close() + + # Test cases + @test_tracker_info(uuid='392622d3-0c5c-4767-afa2-abfb2058b0b8') + def test_screenoff_roaming(self): + """Test roaming power consumption with screen off. + Change the attenuation level to trigger roaming between two APs + + """ + # Setup both APs + network_main = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_main, network_main) + network_aux = self.aux_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_aux, network_aux) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Set attenuator and add two networks to the phone + self.log.info('Set attenuation to connect device to both APs') + [ + self.attenuators[i].set_atten(self.atten_level['zero_atten'][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network_aux) + time.sleep(5) + wutils.wifi_connect(self.dut, network_main) + self.dut.droid.goToSleepNow() + time.sleep(5) + # Set attenuator to trigger roaming + self.dut.log.info('Trigger roaming now') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.ap_close_all() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='2fec5208-043a-410a-8fd2-6784d70a3587') + def test_screenoff_fastroaming(self): + + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Setup the aux AP + network_main = self.main_network[hc.BAND_2G] + network_aux = self.aux_network[hc.BAND_2G] + # Set the same SSID for the AUX AP for fastroaming purpose + network_aux[wc.SSID] = network_main[wc.SSID] + wputils.ap_setup(self.access_point_aux, network_aux) + # Set attenuator and add two networks to the phone + self.log.info('Set attenuation to connect device to the aux AP') + [ + self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network_aux) + time.sleep(5) + # Setup the main AP + wputils.ap_setup(self.access_point_main, network_main) + # Set attenuator to connect the phone to main AP + self.log.info('Set attenuation to connect device to the main AP') + [ + self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network_main) + time.sleep(5) + self.dut.droid.goToSleepNow() + # Trigger fastroaming + self.dut.log.info('Trigger fastroaming now') + [ + self.attenuators[i].set_atten(self.atten_level[wc.AP_MAIN][i]) + for i in range(self.num_atten) + ] + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.ap_close_all() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='a0459b7c-74ce-4adb-8e55-c5365bc625eb') + def test_screenoff_toggle_between_AP(self): + + # Setup both APs + network_main = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_main, network_main) + network_aux = self.aux_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_aux, network_aux) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + self.mon_info['duration'] = self.toggle_interval + self.dut.droid.goToSleepNow() + time.sleep(5) + self.log.info('Set attenuation to connect device to both APs') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + # Toggle between two networks + for i in range(self.toggle_times): + self.dut.log.info('Connecting to %s' % network_main[wc.SSID]) + self.dut.droid.wifiConnect(network_main) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, 0) + self.dut.log.info('Connecting to %s' % network_aux[wc.SSID]) + self.dut.droid.wifiConnect(network_aux) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, 0) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.ap_close_all() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='e5ff95c0-b17e-425c-a903-821ba555a9b9') + def test_screenon_toggle_between_AP(self): + + # Setup both APs + network_main = self.main_network[hc.BAND_5G] + wputils.ap_setup(self.access_point_main, network_main) + network_aux = self.aux_network[hc.BAND_5G] + wputils.ap_setup(self.access_point_aux, network_aux) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + self.mon_info['duration'] = self.toggle_interval + self.log.info('Set attenuation to connect device to both APs') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + # Toggle between two networks + for i in range(self.toggle_times): + self.dut.log.info('Connecting to %s' % network_main[wc.SSID]) + self.dut.droid.wifiConnect(network_main) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, 0) + self.dut.log.info('Connecting to %s' % network_aux[wc.SSID]) + self.dut.droid.wifiConnect(network_aux) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, 0) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.ap_close_all() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4') + def test_screenoff_wifi_wedge(self): + + # Setup both APs + network_main = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_main, network_main) + network_aux = self.aux_network[hc.BAND_2G] + wputils.ap_setup(self.access_point_aux, network_aux) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Set attenuator to connect phone to both networks + self.log.info('Set attenuation to connect device to both APs') + [ + self.attenuators[i].set_atten(self.atten_level['zero_atten'][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network_main) + wutils.wifi_connect(self.dut, network_aux) + self.log.info('Forget network {}'.format(network_aux[wc.SSID])) + wutils.wifi_forget_network(self.dut, network_aux[wc.SSID]) + self.log.info('Set attenuation to trigger wedge condition') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.dut.droid.goToSleepNow() + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.ap_close_all() + # Path fail check + wputils.pass_fail_check(self, avg_current) diff --git a/acts/tests/google/power/PowerscanTest.py b/acts/tests/google/power/PowerscanTest.py new file mode 100644 index 0000000000..4afb3225f4 --- /dev/null +++ b/acts/tests/google/power/PowerscanTest.py @@ -0,0 +1,342 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from acts import base_test +from acts.controllers.ap_lib import hostapd_constants as hc +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils + +UNLOCK_SCREEN = 'input keyevent 82' + + +class PowerscanTest(base_test.BaseTestClass): + def __init__(self, controllers): + + base_test.BaseTestClass.__init__(self, controllers) + self.tests = ('test_single_shot_scan_2g_highRSSI', + 'test_single_shot_scan_2g_lowRSSI', + 'test_single_shot_scan_5g_highRSSI', + 'test_single_shot_scan_5g_lowRSSI', + 'test_background_scan' + 'test_wifi_scan_2g', 'test_wifi_scan_5g', + 'test_scan_wifidisconnected_turnonscreen', + 'test_scan_wificonnected_turnonscreen', + 'test_scan_screenoff_below_rssi_threshold', + 'test_scan_screenoff_lost_wificonnection') + + def setup_class(self): + + self.log = logging.getLogger() + self.dut = self.android_devices[0] + self.access_point = self.access_points[0] + req_params = ('main_network', 'scantest_params') + self.unpack_userparams(req_params) + self.unpack_testparams(self.scantest_params) + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + self.num_atten = self.attenuators[0].instrument.num_atten + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def setup_test(self): + + self.SINGLE_SHOT_SCAN = ( + 'am instrument -w -r -e min_scan_count \"700\"' + ' -e WifiScanTest-testWifiSingleShotScan %d' + ' -e class com.google.android.platform.powertests.' + 'WifiScanTest#testWifiSingleShotScan' + ' com.google.android.platform.powertests/' + 'android.test.InstrumentationTestRunner > /dev/null &' % + (self.mon_duration + self.mon_offset + 10)) + self.BACKGROUND_SCAN = ( + 'am instrument -w -r -e min_scan_count \"1\" -e ' + 'WifiScanTest-testWifiBackgroundScan %d -e class ' + 'com.google.android.platform.powertests.WifiScan' + 'Test#testWifiBackgroundScan com.google.android.' + 'platform.powertests/android.test.Instrumentation' + 'TestRunner > /dev/null &' % + (self.mon_duration + self.mon_offset + 10)) + self.WIFI_SCAN = ( + 'am instrument -w -r -e min_scan_count \"1\" -e ' + 'WifiScanTest-testWifiScan %d -e class ' + 'com.google.android.platform.powertests.WifiScanTest#' + 'testWifiScan com.google.android.platform.powertests/' + 'android.test.InstrumentationTestRunner > /dev/null &' % + (self.mon_duration + self.mon_offset + 10)) + + def teardown_class(self): + + self.mon.usb('on') + + def powrapk_scan_test_func(self, scan_command): + """Test function for power.apk triggered scans. + Args: + scan_command: the adb shell command to trigger scans + + """ + self.mon_info['offset'] == 0 + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + self.log.info('Wait for {} seconds'.format(self.settle_wait_time)) + time.sleep(self.settle_wait_time) + self.log.info('Running power apk command to trigger scans') + self.dut.adb.shell_nb(scan_command) + self.dut.droid.goToSleepNow() + # Collect power data and plot + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.access_point.close() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + # Test cases + @test_tracker_info(uuid='e5539b01-e208-43c6-bebf-6f1e73d8d8cb') + def test_single_shot_scan_2g_highRSSI(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation to get high RSSI at 2g') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='14c5a762-95bc-40ea-9fd4-27126df7d86c') + def test_single_shot_scan_2g_lowRSSI(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation to get low RSSI at 2g') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='a6506600-c567-43b5-9c25-86b505099b97') + def test_single_shot_scan_2g_noAP(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation so all AP is out of reach ') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='1a458248-1159-4c8e-a39f-92fc9e69c4dd') + def test_single_shot_scan_5g_highRSSI(self): + + network = self.main_network[hc.BAND_5G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation to get high RSSI at 5g') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='bd4da426-a621-4131-9f89-6e5a77f321d2') + def test_single_shot_scan_5g_lowRSSI(self): + + network = self.main_network[hc.BAND_5G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation to get low RSSI at 5g') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='288b3add-8925-4803-81c0-53debf157ffc') + def test_single_shot_scan_5g_noAP(self): + + network = self.main_network[hc.BAND_5G] + wputils.ap_setup(self.access_point, network) + self.log.info('Set attenuation so all AP is out of reach ') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.SINGLE_SHOT_SCAN) + + @test_tracker_info(uuid='f401c66c-e515-4f51-8ef2-2a03470d8ff2') + def test_background_scan(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + self.powrapk_scan_test_func(self.BACKGROUND_SCAN) + + @test_tracker_info(uuid='fe38c1c7-937c-42c0-9381-98356639df8f') + def test_wifi_scan_2g(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.WIFI_SCAN) + + @test_tracker_info(uuid='8eedefd1-3a08-4ac2-ba55-5eb438def3d4') + def test_wifi_scan_5g(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + self.powrapk_scan_test_func(self.WIFI_SCAN) + + @test_tracker_info(uuid='ff5ea952-ee31-4968-a190-82935ce7a8cb') + def test_scan_wifidisconnected_turnonscreen(self): + + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + self.dut.droid.goToSleepNow() + self.log.info('Screen is OFF') + time.sleep(5) + self.dut.droid.wakeUpNow() + self.log.info('Now turn on screen to trigger scans') + self.dut.adb.shell(UNLOCK_SCREEN) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='9a836e5b-8128-4dd2-8e96-e79177810bdd') + def test_scan_wificonnected_turnonscreen(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Set attenuators to connect main AP + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network) + time.sleep(10) + self.dut.droid.goToSleepNow() + self.log.info('Screen is OFF') + time.sleep(5) + self.dut.droid.wakeUpNow() + self.log.info('Now turn on screen to trigger scans') + self.dut.adb.shell(UNLOCK_SCREEN) + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.access_point.close() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='51e3c4f1-742b-45af-afd5-ae3552a03272') + def test_scan_screenoff_below_rssi_threshold(self): + + network = self.main_network[hc.BAND_2G] + wputils.ap_setup(self.access_point, network) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Set attenuator and add main network to the phone + self.log.info('Set attenuation so device connection has medium RSSI') + [ + self.attenuators[i].set_atten(self.atten_level['zero_atten'][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network) + self.dut.droid.goToSleepNow() + time.sleep(20) + # Set attenuator to make RSSI below threshold + self.log.info('Set attenuation to drop RSSI below threhold') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.access_point.close() + # Path fail check + wputils.pass_fail_check(self, avg_current) + + @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4') + def test_scan_screenoff_lost_wificonnection(self): + + network = self.main_network[hc.BAND_5G] + wputils.ap_setup(self.access_point, network) + # Initialize the dut to rock-bottom state + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + # Set attenuator and add main network to the phone + self.log.info('Set attenuation so device connection has medium RSSI') + [ + self.attenuators[i].set_atten(self.atten_level['zero_atten'][i]) + for i in range(self.num_atten) + ] + wutils.wifi_connect(self.dut, network) + self.dut.droid.goToSleepNow() + time.sleep(5) + # Set attenuator to make RSSI below threshold + self.log.info('Set attenuation so device loses connection') + [ + self.attenuators[i].set_atten( + self.atten_level[self.current_test_name][i]) + for i in range(self.num_atten) + ] + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + wputils.monsoon_data_plot(self.mon_info, file_path) + # Close AP controller + self.access_point.close() + # Path fail check + wputils.pass_fail_check(self, avg_current) diff --git a/acts/tests/google/power/PowertrafficTest.py b/acts/tests/google/power/PowertrafficTest.py new file mode 100644 index 0000000000..0feb72018e --- /dev/null +++ b/acts/tests/google/power/PowertrafficTest.py @@ -0,0 +1,164 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +import os +import time +from acts import base_test +from acts.controllers.ap_lib import bridge_interface as bi +from acts.controllers.ap_lib import hostapd_constants as hc +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_power_test_utils as wputils +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +import acts.controllers.iperf_server as ipf + + +class PowertrafficTest(base_test.BaseTestClass): + def __init__(self, controllers): + + WifiBaseTest.__init__(self, controllers) + self.tests = ('test_screenoff_iperf_2g_highrssi', + 'test_screenoff_iperf_2g_mediumrssi', + 'test_screenoff_iperf_2g_lowrssi', + 'test_screenoff_iperf_5g_highrssi', + 'test_screenoff_iperf_5g_mediumrssi', + 'test_screenoff_iperf_5g_lowrssi') + + def setup_class(self): + + self.log = logging.getLogger() + self.dut = self.android_devices[0] + req_params = ['main_network', 'traffictest_params'] + self.unpack_userparams(req_params) + self.unpack_testparams(self.traffictest_params) + self.num_atten = self.attenuators[0].instrument.num_atten + self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + self.mon_duration = self.iperf_duration - 10 + self.mon = self.monsoons[0] + self.mon.set_max_current(8.0) + self.mon.set_voltage(4.2) + self.mon.attach_device(self.dut) + self.mon_info = wputils.create_monsoon_info(self) + self.iperf_server = self.iperf_servers[0] + self.access_point = self.access_points[0] + self.pkt_sender = self.packet_senders[0] + + def teardown_test(self): + self.iperf_server.stop() + self.access_point.close() + + def unpack_testparams(self, bulk_params): + """Unpack all the test specific parameters. + + Args: + bulk_params: dict with all test specific params in the config file + """ + for key in bulk_params.keys(): + setattr(self, key, bulk_params[key]) + + def iperf_power_test_func(self, screen_status, band): + """Test function for iperf power measurement at different RSSI level. + + Args: + screen_status: screen ON or OFF + band: desired band for AP to operate on + """ + wputils.dut_rockbottom(self.dut) + wutils.wifi_toggle_state(self.dut, True) + + # Set up the AP + network = self.main_network[band] + channel = network['channel'] + configs = self.access_point.generate_bridge_configs(channel) + brconfigs = bi.BridgeInterfaceConfigs(configs[0], configs[1], + configs[2]) + self.access_point.bridge.startup(brconfigs) + wputils.ap_setup(self.access_point, network) + + # Wait for DHCP on the ethernet port and get IP as Iperf server address + # Time out in 60 seconds if not getting DHCP address + iface_eth = self.pkt_sender.interface + self.iperf_server_address = wputils.wait_for_dhcp(iface_eth) + + # Set attenuator to desired level + self.log.info('Set attenuation to desired RSSI level') + for i in range(self.num_atten): + attenuation = self.atten_level[self.current_test_name][i] + self.attenuators[i].set_atten(attenuation) + + # Connect the phone to the AP + wutils.wifi_connect(self.dut, network) + time.sleep(5) + if screen_status == 'OFF': + self.dut.droid.goToSleepNow() + RSSI = wputils.get_wifi_rssi(self.dut) + + # Run IPERF session + iperf_args = '-i 1 -t %d > /dev/null' % self.iperf_duration + self.iperf_server.start() + wputils.run_iperf_client_nonblocking( + self.dut, self.iperf_server_address, iperf_args) + + # Collect power data and plot + file_path, avg_current = wputils.monsoon_data_collect_save( + self.dut, self.mon_info, self.current_test_name, self.bug_report) + iperf_result = ipf.IPerfResult(self.iperf_server.log_files[-1]) + + # Monsoon Power data plot with IPerf throughput information + tag = '_RSSI_{0:d}dBm_Throughput_{1:.2f}Mbps'.format( + RSSI, (iperf_result.avg_receive_rate * 8)) + wputils.monsoon_data_plot(self.mon_info, file_path, tag) + + # Bring down bridge interface + self.access_point.bridge.teardown(brconfigs) + + # Bring down the AP object + self.access_point.close() + + # Pass and fail check + wputils.pass_fail_check(self, avg_current) + + # Test cases + @test_tracker_info(uuid='43d9b146-3547-4a27-9d79-c9341c32ccda') + def test_screenoff_iperf_2g_highrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_2G) + + @test_tracker_info(uuid='f00a868b-c8b1-4b36-8136-b39b5c2396a7') + def test_screenoff_iperf_2g_mediumrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_2G) + + @test_tracker_info(uuid='cd0c37ac-23fe-4dd1-9130-ccb2dfa71020') + def test_screenoff_iperf_2g_lowrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_2G) + + @test_tracker_info(uuid='f9173d39-b46d-4d80-a5a5-7966f5eed9de') + def test_screenoff_iperf_5g_highrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_5G) + + @test_tracker_info(uuid='cf77e1dc-30bc-4df9-88be-408f1fddc24f') + def test_screenoff_iperf_5g_mediumrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_5G) + + @test_tracker_info(uuid='48f91745-22dc-47c9-ace6-c2719df651d6') + def test_screenoff_iperf_5g_lowrssi(self): + + self.iperf_power_test_func('OFF', hc.BAND_5G) diff --git a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py index 6f5cdb90b3..abdf2d4e2c 100644 --- a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py +++ b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py @@ -79,7 +79,6 @@ class WifiEnterpriseRoamingTest(base_test.BaseTestClass): Ent.EAP: int(EAP.SIM), WifiEnums.SSID_KEY: self.ent_roaming_ssid, } - self.attenuators = wutils.group_attenuators(self.attenuators) self.attn_a = self.attenuators[0] self.attn_b = self.attenuators[1] # Set screen lock password so ConfigStore is unlocked. diff --git a/acts/tests/google/wifi/WifiEnterpriseTest.py b/acts/tests/google/wifi/WifiEnterpriseTest.py index b1a5391ab9..785d91ff2d 100755 --- a/acts/tests/google/wifi/WifiEnterpriseTest.py +++ b/acts/tests/google/wifi/WifiEnterpriseTest.py @@ -22,6 +22,8 @@ from acts import asserts from acts import base_test from acts import signals from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump +from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils WifiEnums = wutils.WifiEnums @@ -127,6 +129,8 @@ class WifiEnterpriseTest(base_test.BaseTestClass): del self.config_passpoint_ttls[WifiEnums.SSID_KEY] # Set screen lock password so ConfigStore is unlocked. self.dut.droid.setDevicePassword(self.device_password) + self.tcpdump_pid = None + self.tcpdump_file = None def teardown_class(self): wutils.reset_wifi(self.dut) @@ -139,8 +143,16 @@ class WifiEnterpriseTest(base_test.BaseTestClass): self.dut.droid.wakeUpNow() wutils.reset_wifi(self.dut) self.dut.ed.clear_all_events() + (self.tcpdump_pid, self.tcpdump_file) = start_adb_tcpdump( + self.dut, self.test_name, mask='all') def teardown_test(self): + if self.tcpdump_pid: + stop_adb_tcpdump(self.dut, + self.tcpdump_pid, + self.tcpdump_file, + pull_tcpdump=True) + self.tcpdump_pid = None self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() self.dut.droid.wifiStopTrackingStateChange() diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py index 5f1fb86d50..513c79eb1d 100755 --- a/acts/tests/google/wifi/WifiManagerTest.py +++ b/acts/tests/google/wifi/WifiManagerTest.py @@ -383,8 +383,15 @@ class WifiManagerTest(WifiBaseTest): @test_tracker_info(uuid="aca85551-10ba-4007-90d9-08bcdeb16a60") def test_forget_network(self): - self.test_add_network() ssid = self.open_network[WifiEnums.SSID_KEY] + nId = self.dut.droid.wifiAddNetwork(self.open_network) + asserts.assert_true(nId > -1, "Failed to add network.") + configured_networks = self.dut.droid.wifiGetConfiguredNetworks() + self.log.debug( + ("Configured networks after adding: %s" % configured_networks)) + wutils.assert_network_in_list({ + WifiEnums.SSID_KEY: ssid + }, configured_networks) wutils.wifi_forget_network(self.dut, ssid) configured_networks = self.dut.droid.wifiGetConfiguredNetworks() for nw in configured_networks: diff --git a/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py b/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py index 7ecafc85f2..32d4c1fc4e 100644 --- a/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py +++ b/acts/tests/google/wifi/WifiNewSetupAutoJoinTest.py @@ -32,6 +32,17 @@ class WifiNewSetupAutoJoinTest(WifiBaseTest): def __init__(self, controllers): WifiBaseTest.__init__(self, controllers) + def add_network_and_enable(self, network): + """Add a network and enable it. + + Args: + network : Network details for the network to be added. + + """ + ret = self.dut.droid.wifiAddNetwork(network) + asserts.assert_true(ret != -1, "Add network %r failed" % network) + self.dut.droid.wifiEnableNetwork(ret, 0) + def setup_class(self): """It will setup the required dependencies from config file and configure the required networks for auto-join testing. Configured networks will @@ -79,29 +90,12 @@ class WifiNewSetupAutoJoinTest(WifiBaseTest): wait_time = 15 self.dut.droid.wakeLockAcquireBright() self.dut.droid.wakeUpNow() - try: - self.dut.droid.wifiConnectByConfig(self.reference_networks[0][ - '2g']) - connect_result = self.dut.ed.pop_event( - wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 1) - self.log.info(connect_result) - time.sleep(wait_time) - if self.ref_ssid_count == 2: #add 5g network as well - self.dut.droid.wifiConnectByConfig(self.reference_networks[ - 0]['5g']) - connect_result = self.dut.ed.pop_event( - wifi_constants.CONNECT_BY_CONFIG_SUCCESS, 1) - self.log.info(connect_result) - time.sleep(wait_time) - current_network = self.dut.droid.wifiGetConnectionInfo() - self.log.info("Current network: {}".format(current_network)) - asserts.assert_true('network_id' in current_network, - NETWORK_ID_ERROR) - asserts.assert_true(current_network['network_id'] >= 0, - NETWORK_ERROR) - finally: - self.dut.droid.wifiLockRelease() - self.dut.droid.goToSleepNow() + # Add and enable all networks. + for network in self.reference_networks: + self.add_network_and_enable(network['2g']) + self.add_network_and_enable(network['5g']) + self.dut.droid.wifiLockRelease() + self.dut.droid.goToSleepNow() def check_connection(self, network_bssid): """Check current wifi connection networks. diff --git a/acts/tests/google/wifi/WifiPnoTest.py b/acts/tests/google/wifi/WifiPnoTest.py index 31b480e152..b8f85c01cc 100644 --- a/acts/tests/google/wifi/WifiPnoTest.py +++ b/acts/tests/google/wifi/WifiPnoTest.py @@ -116,8 +116,8 @@ class WifiPnoTest(WifiBaseTest): finally: pass - def add_dummy_networks(self, num_networks): - """Add some dummy networks to the device. + def add_and_enable_dummy_networks(self, num_networks): + """Add some dummy networks to the device and enable them. Args: num_networks: Number of networks to add. @@ -127,39 +127,58 @@ class WifiPnoTest(WifiBaseTest): network = {} network[WifiEnums.SSID_KEY] = ssid_name_base + str(i) network[WifiEnums.PWD_KEY] = "pno_dummy" - asserts.assert_true( - self.dut.droid.wifiAddNetwork(network) != -1, - "Add network %r failed" % network) + self.add_network_and_enable(network) + + def add_network_and_enable(self, network): + """Add a network and enable it. + + Args: + network : Network details for the network to be added. + + """ + ret = self.dut.droid.wifiAddNetwork(network) + asserts.assert_true(ret != -1, "Add network %r failed" % network) + self.dut.droid.wifiEnableNetwork(ret, 0) + """ Tests Begin """ @test_tracker_info(uuid="33d3cae4-5fa7-4e90-b9e2-5d3747bba64c") - def test_simple_pno_connection(self): + def test_simple_pno_connection_2g_to_5g(self): """Test PNO triggered autoconnect to a network. Steps: 1. Switch off the screen on the device. 2. Save 2 valid network configurations (a & b) in the device. - 3. Attenuate network b. - 4. Connect the device to network a. - 5. Attenuate network a and remove attenuation on network b and wait for - a few seconds to trigger PNO. - 6. Check the device connected to network b automatically. - 8. Attenuate network b and remove attenuation on network a and wait for - a few seconds to trigger PNO. - 9. Check the device connected to network a automatically. + 3. Attenuate 5Ghz network and wait for a few seconds to trigger PNO. + 4. Check the device connected to 2Ghz network automatically. + 5. Attenuate 2Ghz network and wait for a few seconds to trigger PNO. + 6. Check the device connected to 5Ghz network automatically. """ - asserts.assert_true( - self.dut.droid.wifiAddNetwork(self.pno_network_a) != -1, - "Add network %r failed" % self.pno_network_a) - asserts.assert_true( - self.dut.droid.wifiAddNetwork(self.pno_network_b) != -1, - "Add network %r failed" % self.pno_network_b) - self.set_attns("a_on_b_off") - wutils.wifi_connect(self.dut, self.pno_network_a), + self.add_network_and_enable(self.pno_network_a) + self.add_network_and_enable(self.pno_network_b) + self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a) + self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b) + + @test_tracker_info(uuid="39b945a1-830f-4f11-9e6a-9e9641066a96") + def test_simple_pno_connection_5g_to_2g(self): + """Test PNO triggered autoconnect to a network. + + Steps: + 1. Switch off the screen on the device. + 2. Save 2 valid network configurations (a & b) in the device. + 3. Attenuate 2Ghz network and wait for a few seconds to trigger PNO. + 4. Check the device connected to 5Ghz network automatically. + 5. Attenuate 5Ghz network and wait for a few seconds to trigger PNO. + 6. Check the device connected to 2Ghz network automatically. + + """ + self.add_network_and_enable(self.pno_network_a) + self.add_network_and_enable(self.pno_network_b) self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b) self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a) + @test_tracker_info(uuid="844b15be-ff45-4b09-a11b-0b2b4bb13b22") def test_pno_connection_with_multiple_saved_networks(self): """Test PNO triggered autoconnect to a network when there are more @@ -173,7 +192,10 @@ class WifiPnoTest(WifiBaseTest): 1. Save 16 dummy network configurations in the device. 2. Run the simple pno test. """ - self.add_dummy_networks(16) - self.test_simple_pno_connection() + self.add_and_enable_dummy_networks(16) + self.add_network_and_enable(self.pno_network_a) + self.add_network_and_enable(self.pno_network_b) + self.trigger_pno_and_assert_connect("a_on_b_off", self.pno_network_a) + self.trigger_pno_and_assert_connect("b_on_a_off", self.pno_network_b) """ Tests End """ diff --git a/acts/tests/google/wifi/WifiPreFlightTest.py b/acts/tests/google/wifi/WifiPreFlightTest.py index aae022191a..81fc38eb16 100755 --- a/acts/tests/google/wifi/WifiPreFlightTest.py +++ b/acts/tests/google/wifi/WifiPreFlightTest.py @@ -55,8 +55,11 @@ class WifiPreFlightTest(WifiBaseTest): wutils.wifi_toggle_state(self.dut, True) # Get reference networks as a list - req_params = ["reference_networks"] - self.unpack_userparams(req_param_names=req_params) + opt_params = ["reference_networks"] + self.unpack_userparams(opt_param_names=opt_params) + + if "AccessPoint" in self.user_params: + self.legacy_configure_ap_and_start(ap_count=2) networks = [] for ref_net in self.reference_networks: networks.append(ref_net[self.WIFI_2G]) @@ -66,26 +69,13 @@ class WifiPreFlightTest(WifiBaseTest): len(self.reference_networks) == 4, "Need at least 4 reference network with psk.") - # Set attenuation to 0 and verify reference - # networks show up in the scanned results - if getattr(self, "attenuators", []): - for a in self.attenuators: - a.set_atten(0) - - self.target_networks = [] - for ref_net in self.reference_networks: - self.target_networks.append( {'BSSID': ref_net['bssid']} ) - result = self._find_reference_networks_no_attn() - - if result: - self.log.error("Did not find or signal strength too low " - "for the following reference networks\n%s\n" % result) - return False - def teardown_class(self): wutils.reset_wifi(self.dut) for a in self.attenuators: a.set_atten(0) + if "AccessPoint" in self.user_params: + del self.user_params["reference_networks"] + del self.user_params["open_network"] """ Helper functions """ def _find_reference_networks_no_attn(self): @@ -106,6 +96,7 @@ class WifiPreFlightTest(WifiBaseTest): break time.sleep(WAIT_TIME) scanned_networks = self.dut.droid.wifiGetScanResults() + self.log.info("SCANNED RESULTS %s" % scanned_networks) for net in self.target_networks: if net in found_networks: result = wutils.match_networks(net, scanned_networks) @@ -131,6 +122,7 @@ class WifiPreFlightTest(WifiBaseTest): while(time.time() < start_time + SCAN_TIME): time.sleep(WAIT_TIME) scanned_networks = self.dut.droid.wifiGetScanResults() + self.log.info("SCANNED RESULTS %s" % scanned_networks) result = wutils.match_networks(target_network, scanned_networks) if not result: return True @@ -154,9 +146,25 @@ class WifiPreFlightTest(WifiBaseTest): 2. Verify that the corresponding network does not show up in the scanned results """ - found_networks = [] + # Set attenuation to 0 and verify reference + # networks show up in the scanned results + self.log.info("Verify if all reference networks show with " + "attenuation set to 0") + if getattr(self, "attenuators", []): + for a in self.attenuators: + a.set_atten(0) + self.target_networks = [] + for ref_net in self.reference_networks: + self.target_networks.append( {'BSSID': ref_net['bssid']} ) + result = self._find_reference_networks_no_attn() + asserts.assert_true(not result, + "Did not find or signal strength too low " + "for the following reference networks\n%s\n" % result) # attenuate 1 channel at a time and find the network + self.log.info("Verify if attenuation channel matches with " + "correct reference network") + found_networks = [] for i in range(len(self.attenuators)): target_network = {} target_network['BSSID'] = self.reference_networks[i]['bssid'] @@ -168,7 +176,6 @@ class WifiPreFlightTest(WifiBaseTest): target_network['ATTN'] = i found_networks.append(target_network) - if found_networks: - self.log.error("Attenuators did not match the networks\n %s\n" - % pprint.pformat(found_networks)) - return False + asserts.assert_true(not found_networks, + "Attenuators did not match the networks\n %s\n" + % pprint.pformat(found_networks)) diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py index c33c964e21..ef79f3d7a5 100644 --- a/acts/tests/google/wifi/WifiTetheringTest.py +++ b/acts/tests/google/wifi/WifiTetheringTest.py @@ -34,6 +34,7 @@ from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G from acts.test_utils.wifi import wifi_test_utils as wutils +WAIT_TIME = 2 class WifiTetheringTest(base_test.BaseTestClass): """ Tests for Wifi Tethering """ @@ -43,7 +44,7 @@ class WifiTetheringTest(base_test.BaseTestClass): self.convert_byte_to_mb = 1024.0 * 1024.0 self.new_ssid = "wifi_tethering_test2" - self.data_usage_error = 0.3 + self.data_usage_error = 1 self.hotspot_device = self.android_devices[0] self.tethered_devices = self.android_devices[1:] @@ -63,6 +64,24 @@ class WifiTetheringTest(base_test.BaseTestClass): for ad in self.tethered_devices: wutils.wifi_test_device_init(ad) + # Set chrome browser start with no-first-run verification + # Give permission to read from and write to storage + commands = ["pm grant com.android.chrome " + "android.permission.READ_EXTERNAL_STORAGE", + "pm grant com.android.chrome " + "android.permission.WRITE_EXTERNAL_STORAGE", + "rm /data/local/chrome-command-line", + "am set-debug-app --persistent com.android.chrome", + 'echo "chrome --no-default-browser-check --no-first-run ' + '--disable-fre" > /data/local/tmp/chrome-command-line'] + for cmd in commands: + for dut in self.tethered_devices: + try: + dut.adb.shell(cmd) + except adb.AdbError: + self.log.warn("adb command %s failed on %s" + % (cmd, dut.serial)) + def teardown_class(self): """ Reset devices """ wutils.wifi_toggle_state(self.hotspot_device, True) @@ -70,6 +89,8 @@ class WifiTetheringTest(base_test.BaseTestClass): def on_fail(self, test_name, begin_time): """ Collect bug report on failure """ self.hotspot_device.take_bug_report(test_name, begin_time) + for ad in self.tethered_devices: + ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -112,6 +133,7 @@ class WifiTetheringTest(base_test.BaseTestClass): """ carrier_supports_ipv6 = ["vzw", "tmo"] operator = get_operator_name(self.log, dut) + self.log.info("Carrier is %s" % operator) return operator in carrier_supports_ipv6 def _find_ipv6_default_route(self, dut): @@ -123,6 +145,7 @@ class WifiTetheringTest(base_test.BaseTestClass): """ default_route_substr = "::/0 -> " link_properties = dut.droid.connectivityGetActiveLinkProperties() + self.log.info("LINK PROPERTIES:\n%s\n" % link_properties) return link_properties and default_route_substr in link_properties def _verify_ipv6_tethering(self, dut): @@ -162,13 +185,15 @@ class WifiTetheringTest(base_test.BaseTestClass): for _ in range(50): dut_id = random.randint(0, len(self.tethered_devices)-1) dut = self.tethered_devices[dut_id] + # wait for 1 sec between connect & disconnect stress test + time.sleep(1) if device_connected[dut_id]: wutils.wifi_forget_network(dut, self.network["SSID"]) else: wutils.wifi_connect(dut, self.network) device_connected[dut_id] = not device_connected[dut_id] - def _verify_ping(self, dut, ip): + def _verify_ping(self, dut, ip, isIPv6=False): """ Verify ping works from the dut to IP/hostname Args: @@ -180,7 +205,7 @@ class WifiTetheringTest(base_test.BaseTestClass): False - if not """ self.log.info("Pinging %s from dut %s" % (ip, dut.serial)) - if self._is_ipaddress_ipv6(ip): + if isIPv6 or self._is_ipaddress_ipv6(ip): return dut.droid.pingHost(ip, 5, "ping6") return dut.droid.pingHost(ip) @@ -251,19 +276,20 @@ class WifiTetheringTest(base_test.BaseTestClass): Steps: 1. Start wifi tethering on hotspot device - 2. Verify IPv6 address on hotspot device + 2. Verify IPv6 address on hotspot device (VZW & TMO only) 3. Connect tethered device to hotspot device - 4. Verify IPv6 address on the client's link properties - 5. Verify ping on client using ping6 which should pass + 4. Verify IPv6 address on the client's link properties (VZW only) + 5. Verify ping on client using ping6 which should pass (VZW only) 6. Disable mobile data on provider and verify that link properties - does not have IPv6 address and default route + does not have IPv6 address and default route (VZW only) """ # Start wifi tethering on the hotspot device wutils.toggle_wifi_off_and_on(self.hotspot_device) self._start_wifi_tethering() # Verify link properties on hotspot device - self.log.info("Check IPv6 properties on the hotspot device") + self.log.info("Check IPv6 properties on the hotspot device. " + "Verizon & T-mobile should have IPv6 in link properties") self._verify_ipv6_tethering(self.hotspot_device) # Connect the client to the SSID @@ -271,15 +297,16 @@ class WifiTetheringTest(base_test.BaseTestClass): # Need to wait atleast 2 seconds for IPv6 address to # show up in the link properties - time.sleep(2) + time.sleep(WAIT_TIME) # Verify link properties on tethered device - self.log.info("Check IPv6 properties on the tethered device") + self.log.info("Check IPv6 properties on the tethered device. " + "Device should have IPv6 if carrier is Verizon") self._verify_ipv6_tethering(self.tethered_devices[0]) # Verify ping6 on tethered device ping_result = self._verify_ping(self.tethered_devices[0], - "www.google.com") + wutils.DEFAULT_PING_ADDR, True) if self._supports_ipv6_tethering(self.hotspot_device): asserts.assert_true(ping_result, "Ping6 failed on the client") else: @@ -294,18 +321,19 @@ class WifiTetheringTest(base_test.BaseTestClass): tel_defines.DATA_STATE_CONNECTED, "Could not disable cell data") - time.sleep(2) # wait until the IPv6 is removed from link properties + time.sleep(WAIT_TIME) # wait until the IPv6 is removed from link properties result = self._find_ipv6_default_route(self.tethered_devices[0]) self.hotspot_device.droid.telephonyToggleDataConnection(True) - if not result: + if result: asserts.fail("Found IPv6 default route in link properties:Data off") + self.log.info("Did not find IPv6 address in link properties") # Disable wifi tethering wutils.stop_wifi_tethering(self.hotspot_device) @test_tracker_info(uuid="110b61d1-8af2-4589-8413-11beac7a3025") - def test_wifi_tethering_2ghz_traffic_between_2tethered_devices(self): + def wifi_tethering_2ghz_traffic_between_2tethered_devices(self): """ Steps: 1. Start wifi hotspot with 2G band @@ -319,7 +347,7 @@ class WifiTetheringTest(base_test.BaseTestClass): wutils.stop_wifi_tethering(self.hotspot_device) @test_tracker_info(uuid="953f6e2e-27bd-4b73-85a6-d2eaa4e755d5") - def test_wifi_tethering_5ghz_traffic_between_2tethered_devices(self): + def wifi_tethering_5ghz_traffic_between_2tethered_devices(self): """ Steps: 1. Start wifi hotspot with 5ghz band @@ -413,9 +441,11 @@ class WifiTetheringTest(base_test.BaseTestClass): end_time = int(time.time() * 1000) bytes_before_download = dut.droid.connectivityGetRxBytesForDevice( subscriber_id, 0, end_time) - self.log.info("Bytes before download %s" % bytes_before_download) + self.log.info("Data usage before download: %s MB" % + (bytes_before_download/self.convert_byte_to_mb)) # download file + self.log.info("Download file of size %sMB" % self.file_size) http_file_download_by_chrome(self.tethered_devices[0], self.download_file) @@ -423,13 +453,15 @@ class WifiTetheringTest(base_test.BaseTestClass): end_time = int(time.time() * 1000) bytes_after_download = dut.droid.connectivityGetRxBytesForDevice( subscriber_id, 0, end_time) - self.log.info("Bytes after download %s" % bytes_after_download) + self.log.info("Data usage after download: %s MB" % + (bytes_after_download/self.convert_byte_to_mb)) bytes_diff = bytes_after_download - bytes_before_download wutils.stop_wifi_tethering(self.hotspot_device) # verify data usage update is correct bytes_used = bytes_diff/self.convert_byte_to_mb + self.log.info("Data usage on the device increased by %s" % bytes_used) return bytes_used > self.file_size \ and bytes_used < self.file_size + self.data_usage_error @@ -437,9 +469,9 @@ class WifiTetheringTest(base_test.BaseTestClass): def test_wifi_tethering_data_usage_limit(self): """ Steps: - 1. Set the data usage limit to current data usage + 2MB + 1. Set the data usage limit to current data usage + 10MB 2. Start wifi tethering and connect a dut to the SSID - 3. Download 5MB data on tethered device + 3. Download 20MB data on tethered device a. file download should stop b. tethered device will lose internet connectivity c. data usage limit reached message should be displayed @@ -448,7 +480,7 @@ class WifiTetheringTest(base_test.BaseTestClass): """ wutils.toggle_wifi_off_and_on(self.hotspot_device) dut = self.hotspot_device - data_usage_2mb = 2 * self.convert_byte_to_mb + data_usage_inc = 10 * self.convert_byte_to_mb subscriber_id = dut.droid.telephonyGetSubscriberId() self._start_wifi_tethering() @@ -459,11 +491,11 @@ class WifiTetheringTest(base_test.BaseTestClass): old_data_usage = dut.droid.connectivityQuerySummaryForDevice( subscriber_id, 0, end_time) - # set data usage limit to current usage limit + 2MB + # set data usage limit to current usage limit + 10MB dut.droid.connectivitySetDataUsageLimit( - subscriber_id, str(int(old_data_usage + data_usage_2mb))) + subscriber_id, str(int(old_data_usage + data_usage_inc))) - # download file - size 5MB + # download file - size 20MB http_file_download_by_chrome(self.tethered_devices[0], self.download_file, timeout=120) @@ -479,8 +511,10 @@ class WifiTetheringTest(base_test.BaseTestClass): dut.droid.connectivityFactoryResetNetworkPolicies(subscriber_id) wutils.stop_wifi_tethering(self.hotspot_device) - old_data_usage = (old_data_usage+data_usage_2mb)/self.convert_byte_to_mb + old_data_usage = (old_data_usage+data_usage_inc)/self.convert_byte_to_mb new_data_usage = new_data_usage/self.convert_byte_to_mb + self.log.info("Expected data usage: %s MB" % old_data_usage) + self.log.info("Actual data usage: %s MB" % new_data_usage) return (new_data_usage-old_data_usage) < self.data_usage_error diff --git a/acts/tests/google/wifi/aware/functional/DataPathTest.py b/acts/tests/google/wifi/aware/functional/DataPathTest.py index b12d7261e6..7e77c79a29 100644 --- a/acts/tests/google/wifi/aware/functional/DataPathTest.py +++ b/acts/tests/google/wifi/aware/functional/DataPathTest.py @@ -769,23 +769,6 @@ class DataPathTest(AwareBaseTest): ########################################################################## - def attach_with_identity(self, dut): - """Start an Aware session (attach) and wait for confirmation and identity - information (mac address). - - Args: - dut: Device under test - Returns: - id: Aware session ID. - mac: Discovery MAC address of this device. - """ - id = dut.droid.wifiAwareAttach(True) - autils.wait_for_event(dut, aconsts.EVENT_CB_ON_ATTACHED) - event = autils.wait_for_event(dut, aconsts.EVENT_CB_ON_IDENTITY_CHANGED) - mac = event["data"]["mac"] - - return id, mac - def wait_for_request_responses(self, dut, req_keys, aware_ifs): """Wait for network request confirmation for all request keys. @@ -806,6 +789,7 @@ class DataPathTest(AwareBaseTest): self.log.info("Received an unexpected connectivity, the revoked " "network request probably went through -- %s", event) + @test_tracker_info(uuid="2e325e2b-d552-4890-b470-20b40284395d") def test_multiple_identical_networks(self): """Validate that creating multiple networks between 2 devices, each network with identical configuration is supported over a single NDP. @@ -826,9 +810,9 @@ class DataPathTest(AwareBaseTest): # Initiator+Responder: attach and wait for confirmation & identity # create 10 sessions to be used in the different (but identical) NDPs for i in range(N + M): - id, init_mac = self.attach_with_identity(init_dut) + id, init_mac = autils.attach_with_identity(init_dut) init_ids.append(id) - id, resp_mac = self.attach_with_identity(resp_dut) + id, resp_mac = autils.attach_with_identity(resp_dut) resp_ids.append(id) # wait for for devices to synchronize with each other - there are no other @@ -924,3 +908,140 @@ class DataPathTest(AwareBaseTest): resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) for init_req_key in init_req_keys: init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) + + ######################################################################## + + def run_multiple_ndi(self, sec_configs): + """Validate that the device can create and use multiple NDIs. + + The security configuration can be: + - None: open + - String: passphrase + - otherwise: PMK (byte array) + + Args: + sec_configs: list of security configurations + """ + init_dut = self.android_devices[0] + init_dut.pretty_name = "Initiator" + resp_dut = self.android_devices[1] + resp_dut.pretty_name = "Responder" + + asserts.skip_if(init_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] + < len(sec_configs) or + resp_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] + < len(sec_configs), + "Initiator or Responder do not support multiple NDIs") + + init_id, init_mac = autils.attach_with_identity(init_dut) + resp_id, resp_mac = autils.attach_with_identity(resp_dut) + + # wait for for devices to synchronize with each other - there are no other + # mechanisms to make sure this happens for OOB discovery (except retrying + # to execute the data-path request) + time.sleep(autils.WAIT_FOR_CLUSTER) + + resp_req_keys = [] + init_req_keys = [] + resp_aware_ifs = [] + init_aware_ifs = [] + + for sec in sec_configs: + # Responder: request network + resp_req_key = autils.request_network(resp_dut, + autils.get_network_specifier( + resp_dut, resp_id, + aconsts.DATA_PATH_RESPONDER, + init_mac, sec)) + resp_req_keys.append(resp_req_key) + + # Initiator: request network + init_req_key = autils.request_network(init_dut, + autils.get_network_specifier( + init_dut, init_id, + aconsts.DATA_PATH_INITIATOR, + resp_mac, sec)) + init_req_keys.append(init_req_key) + + # Wait for network + init_net_event = autils.wait_for_event_with_keys( + init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT, + (cconsts.NETWORK_CB_KEY_EVENT, + cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), + (cconsts.NETWORK_CB_KEY_ID, init_req_key)) + resp_net_event = autils.wait_for_event_with_keys( + resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT, + (cconsts.NETWORK_CB_KEY_EVENT, + cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), + (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) + + resp_aware_ifs.append( + resp_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]) + init_aware_ifs.append( + init_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]) + + # check that we are using 2 NDIs + init_aware_ifs = list(set(init_aware_ifs)) + resp_aware_ifs = list(set(resp_aware_ifs)) + + self.log.info("Interface names: I=%s, R=%s", init_aware_ifs, resp_aware_ifs) + self.log.info("Initiator requests: %s", init_req_keys) + self.log.info("Responder requests: %s", resp_req_keys) + + asserts.assert_equal( + len(init_aware_ifs), len(sec_configs), "Multiple initiator interfaces") + asserts.assert_equal( + len(resp_aware_ifs), len(sec_configs), "Multiple responder interfaces") + + for i in range(len(sec_configs)): + if_name = "%s%d" % (aconsts.AWARE_NDI_PREFIX, i) + init_ipv6 = autils.get_ipv6_addr(init_dut, if_name) + resp_ipv6 = autils.get_ipv6_addr(resp_dut, if_name) + + asserts.assert_equal( + init_ipv6 is None, if_name not in init_aware_ifs, + "Initiator interface %s in unexpected state" % if_name) + asserts.assert_equal( + resp_ipv6 is None, if_name not in resp_aware_ifs, + "Responder interface %s in unexpected state" % if_name) + + # release requests + for resp_req_key in resp_req_keys: + resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) + for init_req_key in init_req_keys: + init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) + + @test_tracker_info(uuid="2d728163-11cc-46ba-a973-c8e1e71397fc") + def test_multiple_ndi_open_passphrase(self): + """Verify that can between 2 DUTs can create 2 NDPs with different security + configuration (one open, one using passphrase). The result should use two + different NDIs""" + self.run_multiple_ndi([None, self.PASSPHRASE]) + + @test_tracker_info(uuid="5f2c32aa-20b2-41f0-8b1e-d0b68df73ada") + def test_multiple_ndi_open_pmk(self): + """Verify that can between 2 DUTs can create 2 NDPs with different security + configuration (one open, one using pmk). The result should use two + different NDIs""" + self.run_multiple_ndi([None, self.PMK]) + + @test_tracker_info(uuid="34467659-bcfb-40cd-ba25-7e50560fca63") + def test_multiple_ndi_passphrase_pmk(self): + """Verify that can between 2 DUTs can create 2 NDPs with different security + configuration (one using passphrase, one using pmk). The result should use + two different NDIs""" + self.run_multiple_ndi([self.PASSPHRASE, self.PMK]) + + @test_tracker_info(uuid="d9194ce6-45b6-41b1-9cc8-ada79968966d") + def test_multiple_ndi_passphrases(self): + """Verify that can between 2 DUTs can create 2 NDPs with different security + configuration (using different passphrases). The result should use two + different NDIs""" + self.run_multiple_ndi([self.PASSPHRASE, self.PASSPHRASE2]) + + @test_tracker_info(uuid="879df795-62d2-40d4-a862-bd46d8f7e67f") + def test_multiple_ndi_pmks(self): + """Verify that can between 2 DUTs can create 2 NDPs with different security + configuration (using different PMKS). The result should use two different + NDIs""" + self.run_multiple_ndi([self.PMK, self.PMK2]) diff --git a/acts/tests/google/wifi/aware/performance/ThroughputTest.py b/acts/tests/google/wifi/aware/performance/ThroughputTest.py index 5b59d92b26..6cf1046566 100644 --- a/acts/tests/google/wifi/aware/performance/ThroughputTest.py +++ b/acts/tests/google/wifi/aware/performance/ThroughputTest.py @@ -32,6 +32,9 @@ class ThroughputTest(AwareBaseTest): SERVICE_NAME = "GoogleTestServiceXYZ" + PASSPHRASE = "This is some random passphrase - very very secure!!" + PASSPHRASE2 = "This is some random passphrase - very very secure - but diff!!" + def __init__(self, controllers): AwareBaseTest.__init__(self, controllers) @@ -209,7 +212,7 @@ class ThroughputTest(AwareBaseTest): self.log.info("iPerf3: Sent = %d bps Received = %d bps", results[i]["tx_rate"], results[i]["rx_rate"]) - ######################################################################## + ######################################################################## def test_iperf_single_ndp_aware_only_ib(self): """Measure throughput using iperf on a single NDP, with Aware enabled and @@ -235,3 +238,141 @@ class ThroughputTest(AwareBaseTest): self.run_iperf_max_ndp_aware_only(results=results) asserts.explicit_pass( "test_iperf_max_ndp_aware_only_oob passes", extras=results) + + ######################################################################## + + def run_iperf_max_ndi_aware_only(self, sec_configs, results): + """Measure iperf performance on multiple NDPs between 2 devices using + different security configurations (and hence different NDIs). Test with + Aware enabled and no infrastructure connection - i.e. device is not + associated to an AP. + + The security configuration can be: + - None: open + - String: passphrase + - otherwise: PMK (byte array) + + Args: + sec_configs: list of security configurations + results: Dictionary into which to place test results. + """ + init_dut = self.android_devices[0] + init_dut.pretty_name = "Initiator" + resp_dut = self.android_devices[1] + resp_dut.pretty_name = "Responder" + + asserts.skip_if(init_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] + < len(sec_configs) or + resp_dut.aware_capabilities[aconsts.CAP_MAX_NDI_INTERFACES] + < len(sec_configs), + "Initiator or Responder do not support multiple NDIs") + + + init_id, init_mac = autils.attach_with_identity(init_dut) + resp_id, resp_mac = autils.attach_with_identity(resp_dut) + + # wait for for devices to synchronize with each other - there are no other + # mechanisms to make sure this happens for OOB discovery (except retrying + # to execute the data-path request) + time.sleep(autils.WAIT_FOR_CLUSTER) + + resp_req_keys = [] + init_req_keys = [] + resp_aware_ifs = [] + init_aware_ifs = [] + resp_aware_ipv6s = [] + init_aware_ipv6s = [] + + for sec in sec_configs: + # Responder: request network + resp_req_key = autils.request_network(resp_dut, + autils.get_network_specifier( + resp_dut, resp_id, + aconsts.DATA_PATH_RESPONDER, + init_mac, sec)) + resp_req_keys.append(resp_req_key) + + # Initiator: request network + init_req_key = autils.request_network(init_dut, + autils.get_network_specifier( + init_dut, init_id, + aconsts.DATA_PATH_INITIATOR, + resp_mac, sec)) + init_req_keys.append(init_req_key) + + # Wait for network + init_net_event = autils.wait_for_event_with_keys( + init_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT, + (cconsts.NETWORK_CB_KEY_EVENT, + cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), + (cconsts.NETWORK_CB_KEY_ID, init_req_key)) + resp_net_event = autils.wait_for_event_with_keys( + resp_dut, cconsts.EVENT_NETWORK_CALLBACK, autils.EVENT_TIMEOUT, + (cconsts.NETWORK_CB_KEY_EVENT, + cconsts.NETWORK_CB_LINK_PROPERTIES_CHANGED), + (cconsts.NETWORK_CB_KEY_ID, resp_req_key)) + + resp_aware_ifs.append( + resp_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]) + init_aware_ifs.append( + init_net_event["data"][cconsts.NETWORK_CB_KEY_INTERFACE_NAME]) + + resp_aware_ipv6s.append( + autils.get_ipv6_addr(resp_dut, resp_aware_ifs[-1])) + init_aware_ipv6s.append( + autils.get_ipv6_addr(init_dut, init_aware_ifs[-1])) + + self.log.info("Initiator interfaces/ipv6: %s / %s", init_aware_ifs, + init_aware_ipv6s) + self.log.info("Responder interfaces/ipv6: %s / %s", resp_aware_ifs, + resp_aware_ipv6s) + + # create threads, start them, and wait for all to finish + base_port = 5000 + q = queue.Queue() + threads = [] + for i in range(len(sec_configs)): + threads.append( + threading.Thread( + target=self.run_iperf, + args=(q, init_dut, resp_dut, resp_aware_ifs[i], init_aware_ipv6s[ + i], base_port + i))) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + # release requests + for resp_req_key in resp_req_keys: + resp_dut.droid.connectivityUnregisterNetworkCallback(resp_req_key) + for init_req_key in init_req_keys: + init_dut.droid.connectivityUnregisterNetworkCallback(init_req_key) + + + # collect data + for i in range(len(sec_configs)): + results[i] = {} + result, data = q.get() + asserts.assert_true(result, + "Failure starting/running iperf3 in client mode") + self.log.debug(pprint.pformat(data)) + data_json = json.loads("".join(data)) + if "error" in data_json: + asserts.fail( + "iperf run failed: %s" % data_json["error"], extras=data_json) + results[i]["tx_rate"] = data_json["end"]["sum_sent"]["bits_per_second"] + results[i]["rx_rate"] = data_json["end"]["sum_received"][ + "bits_per_second"] + self.log.info("iPerf3: Sent = %d bps Received = %d bps", + results[i]["tx_rate"], results[i]["rx_rate"]) + + def test_iperf_max_ndi_aware_only_passphrases(self): + """Test throughput for multiple NDIs configured with different passphrases. + """ + results = {} + self.run_iperf_max_ndi_aware_only( + [self.PASSPHRASE, self.PASSPHRASE2], results=results) + asserts.explicit_pass( + "test_iperf_max_ndi_aware_only_passphrases passes", extras=results) diff --git a/acts/tests/sample/OtaSampleTest.py b/acts/tests/sample/OtaSampleTest.py new file mode 100644 index 0000000000..aeb735e808 --- /dev/null +++ b/acts/tests/sample/OtaSampleTest.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import base_test +from acts.libs.ota import ota_updater + + +class OtaSampleTest(base_test.BaseTestClass): + """Demonstrates an example OTA Update test.""" + + def setup_class(self): + ota_updater.initialize(self.user_params, self.android_devices) + self.dut = self.android_devices[0] + + def test_my_test(self): + self.pre_ota() + ota_updater.update(self.dut) + self.post_ota() + + def pre_ota(self): + pass + + def post_ota(self): + pass |