diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2019-02-11 15:38:01 -0800 |
---|---|---|
committer | android-build-merger <android-build-merger@google.com> | 2019-02-11 15:38:01 -0800 |
commit | 050f7a6a0a20dadf40595b177cae750be2c5d4bb (patch) | |
tree | 925cef4d647b718511ae0d9513f53273a968cb9c | |
parent | e570fab62d8605e06dc26f7f1ea7fa685c096bfc (diff) | |
parent | 4056348436c09f43811485606e72c2759b2cca95 (diff) | |
download | platform_tools_test_connectivity-050f7a6a0a20dadf40595b177cae750be2c5d4bb.tar.gz platform_tools_test_connectivity-050f7a6a0a20dadf40595b177cae750be2c5d4bb.tar.bz2 platform_tools_test_connectivity-050f7a6a0a20dadf40595b177cae750be2c5d4bb.zip |
Snap for 5180536 from ebad5b93affe18483ffa706502c187ab741060fe to pi-platform-release
am: 4056348436
Change-Id: I2f601da6d7ee9c0b20eecb5bb31dbd3f487d9d5f
50 files changed, 4672 insertions, 799 deletions
diff --git a/acts/framework/acts/controllers/__init__.py b/acts/framework/acts/controllers/__init__.py index 41e48dfc31..78014d7b17 100644 --- a/acts/framework/acts/controllers/__init__.py +++ b/acts/framework/acts/controllers/__init__.py @@ -25,5 +25,5 @@ def destroy(objs): """This is a list of all the top level controller modules""" __all__ = [ "android_device", "attenuator", "monsoon", "access_point", "iperf_server", - "packet_sender", "arduino_wifi_dongle" + "packet_sender", "arduino_wifi_dongle", "packet_capture" ] diff --git a/acts/framework/acts/controllers/adb.py b/acts/framework/acts/controllers/adb.py index 562cbb8eb5..23dbb12327 100644 --- a/acts/framework/acts/controllers/adb.py +++ b/acts/framework/acts/controllers/adb.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2016 - The Android Open Source Project # @@ -20,6 +20,7 @@ import logging import re import shellescape +from acts import error from acts.libs.proc import job DEFAULT_ADB_TIMEOUT = 60 @@ -28,6 +29,10 @@ DEFAULT_ADB_PULL_TIMEOUT = 180 # (N and above add the serial to the error msg). DEVICE_NOT_FOUND_REGEX = re.compile('^error: device (?:\'.*?\' )?not found') DEVICE_OFFLINE_REGEX = re.compile('^error: device offline') +# Raised when adb forward commands fail to forward a port. +CANNOT_BIND_LISTENER_REGEX = re.compile('^error: cannot bind listener:') +# Expected output is "Android Debug Bridge version 1.0.XX +ADB_VERSION_REGEX = re.compile('Android Debug Bridge version 1.0.(\d+)') ROOT_USER_ID = '0' SHELL_USER_ID = '2000' @@ -46,7 +51,7 @@ def parsing_parcel_output(output): return re.sub(r'[.\s]', '', output) -class AdbError(Exception): +class AdbError(error.ActsError): """Raised when there is an error in adb operations.""" def __init__(self, cmd, stdout, stderr, ret_code): @@ -70,8 +75,6 @@ class AdbProxy(object): >> adb.devices() # will return the console output of "adb devices". """ - _SERVER_LOCAL_PORT = None - def __init__(self, serial="", ssh_connection=None): """Construct an instance of AdbProxy. @@ -81,11 +84,12 @@ class AdbProxy(object): connected to a remote host that we can reach via SSH. """ self.serial = serial + self._server_local_port = None adb_path = self._exec_cmd("which adb") adb_cmd = [adb_path] if serial: adb_cmd.append("-s %s" % serial) - if ssh_connection is not None and not AdbProxy._SERVER_LOCAL_PORT: + if ssh_connection is not None: # Kill all existing adb processes on the remote host (if any) # Note that if there are none, then pkill exits with non-zero status ssh_connection.run("pkill adb", ignore_status=True) @@ -98,9 +102,9 @@ class AdbProxy(object): ssh_connection.run(remote_adb_cmd) # Proxy a local port to the adb server port local_port = ssh_connection.create_ssh_tunnel(5037) - AdbProxy._SERVER_LOCAL_PORT = local_port + self._server_local_port = local_port - if AdbProxy._SERVER_LOCAL_PORT: + if self._server_local_port: adb_cmd.append("-P %d" % local_port) self.adb_str = " ".join(adb_cmd) self._ssh_connection = ssh_connection @@ -169,7 +173,8 @@ class AdbProxy(object): return parsing_parcel_output(out) if ignore_status: return out or err - if ret == 1 and DEVICE_NOT_FOUND_REGEX.match(err): + if ret == 1 and (DEVICE_NOT_FOUND_REGEX.match(err) or + CANNOT_BIND_LISTENER_REGEX.match(err)): raise AdbError(cmd=cmd, stdout=out, stderr=err, ret_code=ret) else: return out @@ -207,10 +212,17 @@ class AdbProxy(object): # 2) Setup forwarding between that remote port and the requested # device port remote_port = self._ssh_connection.find_free_port() - self._ssh_connection.create_ssh_tunnel( + local_port = self._ssh_connection.create_ssh_tunnel( remote_port, local_port=host_port) host_port = remote_port - return self.forward("tcp:%d tcp:%d" % (host_port, device_port)) + output = self.forward("tcp:%d tcp:%d" % (host_port, device_port)) + # If hinted_port is 0, the output will be the selected port. + # Otherwise, there will be no output upon successfully + # forwarding the hinted port. + if output: + return int(output) + else: + return local_port def remove_tcp_forward(self, host_port): """Stop tcp forwarding a port from localhost to this android device. @@ -268,3 +280,19 @@ class AdbProxy(object): return self._exec_adb_cmd(clean_name, arg_str, **kwargs) return adb_call + + def get_version_number(self): + """Returns the version number of ADB as an int (XX in 1.0.XX). + + Raises: + AdbError if the version number is not found/parsable. + """ + version_output = self.version() + match = re.search(ADB_VERSION_REGEX, + version_output) + + if not match: + logging.error('Unable to capture ADB version from adb version ' + 'output: %s' % version_output) + raise AdbError('adb version', version_output, '', '') + return int(match.group(1)) diff --git a/acts/framework/acts/controllers/ap_lib/ap_get_interface.py b/acts/framework/acts/controllers/ap_lib/ap_get_interface.py index b80add5ac1..641ed46020 100644 --- a/acts/framework/acts/controllers/ap_lib/ap_get_interface.py +++ b/acts/framework/acts/controllers/ap_lib/ap_get_interface.py @@ -151,8 +151,7 @@ class ApInterfaces(object): Returns: lan: the only one running LAN interface of the devices - Raises: - ApInterfacesError: no running LAN can be found + None, if nothing was found. """ lan = None interfaces_phy = self.get_physical_interface() @@ -166,11 +165,7 @@ class ApInterfaces(object): if 'RUNNING' in output.stdout: lan = iface break - if lan: - return lan - - raise ApInterfacesError( - 'No running LAN interface available, check connection') + return lan def check_ping(self, iface): """Check the ping status on specific interface to determine the WAN. diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py b/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py index 845f3d3c9e..9120725bca 100644 --- a/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py +++ b/acts/framework/acts/controllers/ap_lib/hostapd_ap_preset.py @@ -15,17 +15,17 @@ from acts.controllers.ap_lib import hostapd_config from acts.controllers.ap_lib import hostapd_constants - -def create_ap_preset(profile_name, +def create_ap_preset(profile_name='whirlwind', + iface_wlan_2g=None, + iface_wlan_5g=None, channel=None, dtim=2, frequency=None, security=None, ssid=None, + hidden=False, vht_bandwidth=80, - bss_settings=[], - iface_wlan_2g=hostapd_constants.WLAN0_STRING, - iface_wlan_5g=hostapd_constants.WLAN1_STRING): + bss_settings=[]): """AP preset config generator. This a wrapper for hostapd_config but but supplies the default settings for the preset that is selected. @@ -49,6 +49,15 @@ def create_ap_preset(profile_name, Returns: A hostapd_config object that can be used by the hostapd object. """ + if not iface_wlan_2g or not iface_wlan_5g: + raise ValueError('WLAN interface for 2G and/or 5G is missing.') + + # The Onhub uses wlan0, wlan1 as the WAN interfaces, while the Gale uses + # wlan-2400mhz, wlan-5000mhz. + if iface_wlan_2g not in hostapd_constants.INTERFACE_2G_LIST or \ + iface_wlan_5g not in hostapd_constants.INTERFACE_5G_LIST: + raise ValueError('Incorrect interface name was passed.') + force_wmm = None force_wmm = None beacon_interval = None @@ -82,6 +91,7 @@ def create_ap_preset(profile_name, ] config = hostapd_config.HostapdConfig( ssid=ssid, + hidden=hidden, security=security, interface=interface, mode=mode, @@ -129,6 +139,7 @@ def create_ap_preset(profile_name, ] config = hostapd_config.HostapdConfig( ssid=ssid, + hidden=hidden, security=security, interface=interface, mode=mode, diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_config.py b/acts/framework/acts/controllers/ap_lib/hostapd_config.py index bfc0ca3afc..2d39875292 100644 --- a/acts/framework/acts/controllers/ap_lib/hostapd_config.py +++ b/acts/framework/acts/controllers/ap_lib/hostapd_config.py @@ -455,7 +455,7 @@ class HostapdConfig(object): self._scenario_name = scenario_name self._min_streams = min_streams - self._bss_lookup = {} + self._bss_lookup = collections.OrderedDict() for bss in bss_settings: if bss.name in self._bss_lookup: raise ValueError('Cannot have multiple bss settings with the' diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_constants.py b/acts/framework/acts/controllers/ap_lib/hostapd_constants.py index a190e71213..8dd766275b 100755 --- a/acts/framework/acts/controllers/ap_lib/hostapd_constants.py +++ b/acts/framework/acts/controllers/ap_lib/hostapd_constants.py @@ -18,9 +18,11 @@ import itertools BAND_2G = '2g' BAND_5G = '5g' +WEP = 0 WPA1 = 1 WPA2 = 2 MIXED = 3 +ENT = 4 # get the correct constant MAX_WPA_PSK_LENGTH = 64 MIN_WPA_PSK_LENGTH = 8 WPA_STRICT_REKEY = 1 @@ -31,10 +33,18 @@ WPA_STRICT_REKEY_DEFAULT = True WPA_STRING = 'wpa' WPA2_STRING = 'wpa2' WPA_MIXED_STRING = 'wpa/wpa2' +ENT_STRING = 'ent' +ENT_KEY_MGMT = 'WPA-EAP' +IEEE8021X = 1 WLAN0_STRING = 'wlan0' WLAN1_STRING = 'wlan1' WLAN2_STRING = 'wlan2' WLAN3_STRING = 'wlan3' +WLAN0_GALE = 'wlan-2400mhz' +WLAN1_GALE = 'wlan-5000mhz' +WEP_STRING = 'wep' +WEP_DEFAULT_KEY = 0 +WEP_HEX_LENGTH = [10, 26, 32, 58] AP_DEFAULT_CHANNEL_2G = 6 AP_DEFAULT_CHANNEL_5G = 36 AP_DEFAULT_MAX_SSIDS_2G = 8 @@ -43,6 +53,8 @@ AP_SSID_LENGTH_2G = 8 AP_PASSPHRASE_LENGTH_2G = 10 AP_SSID_LENGTH_5G = 8 AP_PASSPHRASE_LENGTH_5G = 10 +INTERFACE_2G_LIST = [WLAN0_STRING, WLAN0_GALE] +INTERFACE_5G_LIST = [WLAN1_STRING, WLAN1_GALE] # A mapping of frequency to channel number. This includes some # frequencies used outside the US. diff --git a/acts/framework/acts/controllers/ap_lib/hostapd_security.py b/acts/framework/acts/controllers/ap_lib/hostapd_security.py index 9733e99913..47d41fe003 100644 --- a/acts/framework/acts/controllers/ap_lib/hostapd_security.py +++ b/acts/framework/acts/controllers/ap_lib/hostapd_security.py @@ -13,6 +13,7 @@ # limitations under the License. import collections +import string from acts.controllers.ap_lib import hostapd_constants @@ -28,13 +29,17 @@ class Security(object): wpa_cipher=hostapd_constants.WPA_DEFAULT_CIPHER, wpa2_cipher=hostapd_constants.WPA2_DEFAULT_CIPER, wpa_group_rekey=hostapd_constants.WPA_GROUP_KEY_ROTATION_TIME, - wpa_strict_rekey=hostapd_constants.WPA_STRICT_REKEY_DEFAULT): + wpa_strict_rekey=hostapd_constants.WPA_STRICT_REKEY_DEFAULT, + wep_default_key=hostapd_constants.WEP_DEFAULT_KEY, + radius_server_ip=None, + radius_server_port=None, + radius_server_secret=None): """Gather all of the security settings for WPA-PSK. This could be expanded later. Args: security_mode: Type of security modes. - Options: wpa, wpa2, wpa/wpa2 + Options: wep, wpa, wpa2, wpa/wpa2 password: The PSK or passphrase for the security mode. wpa_cipher: The cipher to be used for wpa. Options: TKIP, CCMP, TKIP CCMP @@ -50,51 +55,83 @@ class Security(object): leaves the network or not. Options: True, False Default: True + wep_default_key: The wep key number to use when transmitting. + radius_server_ip: Radius server IP for Enterprise auth. + radius_server_port: Radius server port for Enterprise auth. + radius_server_secret: Radius server secret for Enterprise auth. """ self.wpa_cipher = wpa_cipher self.wpa2_cipher = wpa2_cipher self.wpa_group_rekey = wpa_group_rekey self.wpa_strict_rekey = wpa_strict_rekey + self.wep_default_key = wep_default_key + self.radius_server_ip = radius_server_ip + self.radius_server_port = radius_server_port + self.radius_server_secret = radius_server_secret if security_mode == hostapd_constants.WPA_STRING: security_mode = hostapd_constants.WPA1 elif security_mode == hostapd_constants.WPA2_STRING: security_mode = hostapd_constants.WPA2 elif security_mode == hostapd_constants.WPA_MIXED_STRING: security_mode = hostapd_constants.MIXED + elif security_mode == hostapd_constants.WEP_STRING: + security_mode = hostapd_constants.WEP + elif security_mode == hostapd_constants.ENT_STRING: + security_mode = hostapd_constants.ENT else: security_mode = None self.security_mode = security_mode if password: - if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len( - password) > hostapd_constants.MAX_WPA_PSK_LENGTH: - raise ValueError( - 'Password must be a minumum of %s characters and a maximum of %s' - % (hostapd_constants.MIN_WPA_PSK_LENGTH, - hostapd_constants.MAX_WPA_PSK_LENGTH)) + if security_mode == hostapd_constants.WEP: + if len(password) in hostapd_constants.WEP_HEX_LENGTH and all( + c in string.hexdigits for c in password): + self.password = password + else: + raise ValueError( + 'WEP key must be a hex string of %s characters' + % hostapd_constants.WEP_HEX_LENGTH) else: - self.password = password + if len(password) < hostapd_constants.MIN_WPA_PSK_LENGTH or len( + password) > hostapd_constants.MAX_WPA_PSK_LENGTH: + raise ValueError( + 'Password must be a minumum of %s characters and a maximum of %s' + % (hostapd_constants.MIN_WPA_PSK_LENGTH, + hostapd_constants.MAX_WPA_PSK_LENGTH)) + else: + self.password = password def generate_dict(self): """Returns: an ordered dictionary of settings""" settings = collections.OrderedDict() - if self.security_mode: - settings['wpa'] = self.security_mode - if len(self.password) == hostapd_constants.MAX_WPA_PSK_LENGTH: - settings['wpa_psk'] = self.password + if self.security_mode != None: + if self.security_mode == hostapd_constants.WEP: + settings['wep_default_key'] = self.wep_default_key + settings['wep_key' + str(self.wep_default_key)] = self.password + elif self.security_mode == hostapd_constants.ENT: + settings['auth_server_addr'] = self.radius_server_ip + settings['auth_server_port'] = self.radius_server_port + settings['auth_server_shared_secret'] = self.radius_server_secret + settings['wpa_key_mgmt'] = hostapd_constants.ENT_KEY_MGMT + settings['ieee8021x'] = hostapd_constants.IEEE8021X + settings['wpa'] = hostapd_constants.WPA2 else: - settings['wpa_passphrase'] = self.password + settings['wpa'] = self.security_mode + if len(self.password) == hostapd_constants.MAX_WPA_PSK_LENGTH: + settings['wpa_psk'] = self.password + else: + settings['wpa_passphrase'] = self.password - if self.security_mode == hostapd_constants.MIXED: - settings['wpa_pairwise'] = self.wpa_cipher - settings['rsn_pairwise'] = self.wpa2_cipher - elif self.security_mode == hostapd_constants.WPA1: - settings['wpa_pairwise'] = self.wpa_cipher - elif self.security_mode == hostapd_constants.WPA2: - settings['rsn_pairwise'] = self.wpa2_cipher + if self.security_mode == hostapd_constants.MIXED: + settings['wpa_pairwise'] = self.wpa_cipher + settings['rsn_pairwise'] = self.wpa2_cipher + elif self.security_mode == hostapd_constants.WPA1: + settings['wpa_pairwise'] = self.wpa_cipher + elif self.security_mode == hostapd_constants.WPA2: + settings['rsn_pairwise'] = self.wpa2_cipher - if self.wpa_group_rekey: - settings['wpa_group_rekey'] = self.wpa_group_rekey - if self.wpa_strict_rekey: - settings[ - 'wpa_strict_rekey'] = hostapd_constants.WPA_STRICT_REKEY + if self.wpa_group_rekey: + settings['wpa_group_rekey'] = self.wpa_group_rekey + if self.wpa_strict_rekey: + settings[ + 'wpa_strict_rekey'] = hostapd_constants.WPA_STRICT_REKEY return settings diff --git a/acts/framework/acts/controllers/attenuator_lib/minicircuits/http.py b/acts/framework/acts/controllers/attenuator_lib/minicircuits/http.py new file mode 100644 index 0000000000..b68d79d624 --- /dev/null +++ b/acts/framework/acts/controllers/attenuator_lib/minicircuits/http.py @@ -0,0 +1,146 @@ +#!/usr/bin/env python3 + +# Copyright 2016- The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Class for HTTP control of Mini-Circuits RCDAT series attenuators + +This class provides a wrapper to the MC-RCDAT attenuator modules for purposes +of simplifying and abstracting control down to the basic necessities. It is +not the intention of the module to expose all functionality, but to allow +interchangeable HW to be used. + +See http://www.minicircuits.com/softwaredownload/Prog_Manual-6-Programmable_Attenuator.pdf +""" + +import urllib +from acts.controllers import attenuator + + +class AttenuatorInstrument(attenuator.AttenuatorInstrument): + """A specific HTTP-controlled implementation of AttenuatorInstrument for + Mini-Circuits RC-DAT attenuators. + + With the exception of HTTP-specific commands, all functionality is defined + by the AttenuatorInstrument class. + """ + + def __init__(self, num_atten=1): + super(AttenuatorInstrument, self).__init__(num_atten) + self._ip_address = None + self._port = None + self._timeout = None + + def open(self, host, port=80, timeout=2): + """Initializes the AttenuatorInstrument and queries basic information. + + Args: + host: A valid hostname (IP address or DNS-resolvable name) to an + MC-DAT attenuator instrument. + port: An optional port number (defaults to http default 80) + timeout: An optional timeout for http requests + """ + self._ip_address = host + self._port = port + self._timeout = timeout + + att_req = urllib.request.urlopen('http://{}:{}/MN?'.format( + self._ip_address, self._port)) + config_str = att_req.read().decode('utf-8') + if not config_str.startswith('MN='): + raise attenuator.InvalidDataError( + 'Attenuator returned invalid data. Attenuator returned: {}'. + format(config_str)) + + config_str = config_str[len('MN='):] + self.properties = dict( + zip(['model', 'max_freq', 'max_atten'], config_str.split('-', 2))) + self.max_atten = float(self.properties['max_atten']) + + def is_open(self): + """Returns True if the AttenuatorInstrument has an open connection. + + Since this controller is based on HTTP requests, there is no connection + required and the attenuator is always ready to accept requests. + """ + return True + + def close(self): + """Closes the connection to the attenuator. + + Since this controller is based on HTTP requests, there is no connection + teardowns required. + """ + pass + + def set_atten(self, idx, value): + """This function sets the attenuation of an attenuator given its index + in the instrument. + + Args: + idx: A zero-based index that identifies a particular attenuator in + an instrument. For instruments that only have one channel, this + is ignored by the device. + value: A floating point value for nominal attenuation to be set. + + Raises: + InvalidDataError if the attenuator does not respond with the + expected output. + """ + if not (0 <= idx < self.num_atten): + raise IndexError('Attenuator index out of range!', self.num_atten, + idx) + + if value > self.max_atten: + raise ValueError('Attenuator value out of range!', self.max_atten, + value) + # The actual device uses one-based index for channel numbers. + att_req = urllib.request.urlopen( + 'http://{}:{}/CHAN:{}:SETATT:{}'.format( + self._ip_address, self._port, idx + 1, value), + timeout=self._timeout) + att_resp = att_req.read().decode('utf-8') + if att_resp != '1': + raise attenuator.InvalidDataError( + 'Attenuator returned invalid data. Attenuator returned: {}'. + format(att_resp)) + + def get_atten(self, idx): + """Returns the current attenuation of the attenuator at the given index. + + Args: + idx: The index of the attenuator. + + Raises: + InvalidDataError if the attenuator does not respond with the + expected outpu + + Returns: + the current attenuation value as a float + """ + if not (0 <= idx < self.num_atten): + raise IndexError('Attenuator index out of range!', self.num_atten, + idx) + att_req = urllib.request.urlopen( + 'http://{}:{}/CHAN:{}:ATT?'.format(self._ip_address, self.port, + idx + 1), + timeout=self._timeout) + att_resp = att_req.read().decode('utf-8') + try: + atten_val = float(att_resp) + except: + raise attenuator.InvalidDataError( + 'Attenuator returned invalid data. Attenuator returned: {}'. + format(att_resp)) + return atten_val diff --git a/acts/framework/acts/controllers/iperf_server.py b/acts/framework/acts/controllers/iperf_server.py index 9b952c5555..127b6faa0b 100755 --- a/acts/framework/acts/controllers/iperf_server.py +++ b/acts/framework/acts/controllers/iperf_server.py @@ -82,7 +82,7 @@ class IPerfResult(object): iperf_output = iperf_output[0: iperf_output.index("}\n") + 1] iperf_string = ''.join(iperf_output) - iperf_string = iperf_string.replace("-nan", '0') + iperf_string = iperf_string.replace("nan", '0') self.result = json.loads(iperf_string) except ValueError: with open(result_path, 'r') as f: diff --git a/acts/framework/acts/controllers/packet_capture.py b/acts/framework/acts/controllers/packet_capture.py new file mode 100755 index 0000000000..df3a989968 --- /dev/null +++ b/acts/framework/acts/controllers/packet_capture.py @@ -0,0 +1,350 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import logger +from acts.controllers.ap_lib.hostapd_constants import AP_DEFAULT_CHANNEL_2G +from acts.controllers.ap_lib.hostapd_constants import AP_DEFAULT_CHANNEL_5G +from acts.controllers.utils_lib.ssh import connection +from acts.controllers.utils_lib.ssh import settings + +import os +import threading +import time + +ACTS_CONTROLLER_CONFIG_NAME = 'PacketCapture' +ACTS_CONTROLLER_REFERENCE_NAME = 'packet_capture' +BSS = 'BSS' +BSSID = 'BSSID' +FREQ = 'freq' +FREQUENCY = 'frequency' +LEVEL = 'level' +MON_2G = 'mon0' +MON_5G = 'mon1' +BAND_IFACE = {'2G' : MON_2G, '5G': MON_5G} +SCAN_IFACE = 'wlan2' +SCAN_TIMEOUT = 120 +SEP = ':' +SIGNAL = 'signal' +SSID = 'SSID' + + +def create(configs): + return [PacketCapture(c) for c in configs] + +def destroy(pcaps): + for pcap in pcaps: + pcap.close() + +def get_info(pcaps): + return [pcap.ssh_settings.hostname for pcap in pcaps] + + +class PcapProperties(object): + """Class to maintain packet capture properties after starting tcpdump. + + Attributes: + pid: proccess id of tcpdump + pcap_dir: tmp dir location where pcap files are saved + pcap_file: pcap file name + pcap_thread: thread used to push files to logpath + """ + def __init__(self, pid, pcap_dir, pcap_file, pcap_thread): + """Initialize object.""" + self.pid = pid + self.pcap_dir = pcap_dir + self.pcap_file = pcap_file + self.pcap_thread = pcap_thread + + +class PacketCaptureError(Exception): + """Error related to Packet capture.""" + + +class PacketCapture(object): + """Class representing packet capturer. + + An instance of this class creates and configures two interfaces for monitor + mode; 'mon0' for 2G and 'mon1' for 5G and one interface for scanning for + wifi networks; 'wlan2' which is a dual band interface. + + Attributes: + pcap: dict that specifies packet capture properties for a band. + tmp_dirs: list of tmp directories created for pcap files. + """ + def __init__(self, configs): + """Initialize objects. + + Args: + configs: config for the packet capture. + """ + self.ssh_settings = settings.from_config(configs['ssh_config']) + self.ssh = connection.SshConnection(self.ssh_settings) + self.log = logger.create_logger(lambda msg: '[%s|%s] %s' % ( + ACTS_CONTROLLER_CONFIG_NAME, self.ssh_settings.hostname, msg)) + + self._create_interface(MON_2G, 'monitor') + self._create_interface(MON_5G, 'monitor') + self._create_interface(SCAN_IFACE, 'managed') + + self.pcap_properties = dict() + self._pcap_stop_lock = threading.Lock() + self.tmp_dirs = [] + + def _create_interface(self, iface, mode): + """Create interface of monitor/managed mode. + + Create mon0/mon1 for 2G/5G monitor mode and wlan2 for managed mode. + """ + self.ssh.run('iw dev %s del' % iface, ignore_status=True) + self.ssh.run('iw phy%s interface add %s type %s' + % (iface[-1], iface, mode), ignore_status=True) + self.ssh.run('ip link set %s up' % iface, ignore_status=True) + result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) + if result.stderr or iface not in result.stdout: + raise PacketCaptureError('Failed to configure interface %s' % iface) + + def _cleanup_interface(self, iface): + """Clean up monitor mode interfaces.""" + self.ssh.run('iw dev %s del' % iface, ignore_status=True) + result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) + if not result.stderr or 'No such device' not in result.stderr: + raise PacketCaptureError('Failed to cleanup monitor mode for %s' + % iface) + + def _parse_scan_results(self, scan_result): + """Parses the scan dump output and returns list of dictionaries. + + Args: + scan_result: scan dump output from scan on mon interface. + + Returns: + Dictionary of found network in the scan. + The attributes returned are + a.) SSID - SSID of the network. + b.) LEVEL - signal level. + c.) FREQUENCY - WiFi band the network is on. + d.) BSSID - BSSID of the network. + """ + scan_networks = [] + network = {} + for line in scan_result.splitlines(): + if SEP not in line: + continue + if BSS in line: + network[BSSID] = line.split('(')[0].split()[-1] + field, value = line.lstrip().rstrip().split(SEP)[0:2] + value = value.lstrip() + if SIGNAL in line: + network[LEVEL] = int(float(value.split()[0])) + elif FREQ in line: + network[FREQUENCY] = int(value) + elif SSID in line: + network[SSID] = value + scan_networks.append(network) + network = {} + return scan_networks + + def _check_if_tcpdump_started(self, pcap_log): + """Check if tcpdump started. + + This method ensures that tcpdump has started successfully. + We look for 'listening on' from the stdout indicating that tcpdump + is started. + + Args: + pcap_log: log file that has redirected output of starting tcpdump. + + Returns: + True/False if tcpdump is started or not. + """ + curr_time = time.time() + timeout = 3 + find_str = 'listening on' + while time.time() < curr_time + timeout: + result = self.ssh.run('grep "%s" %s' % (find_str, pcap_log), + ignore_status=True) + if result.stdout and find_str in result.stdout: + return True + time.sleep(1) + return False + + def _pull_pcap(self, band, pcap_file, log_path): + """Pulls pcap files to test log path from onhub. + + Called by start_packet_capture(). This method moves a pcap file to log + path once it has reached 50MB. + + Args: + index: param that indicates if the tcpdump is stopped. + pcap_file: pcap file to move. + log_path: log path to move the pcap file to. + """ + curr_no = 0 + while True: + next_no = curr_no + 1 + curr_fno = '%02i' % curr_no + next_fno = '%02i' % next_no + curr_file = '%s%s' % (pcap_file, curr_fno) + next_file = '%s%s' % (pcap_file, next_fno) + + result = self.ssh.run('ls %s' % next_file, ignore_status=True) + if not result.stderr and next_file in result.stdout: + self.ssh.pull_file(log_path, curr_file) + self.ssh.run('rm -rf %s' % curr_file, ignore_status=True) + curr_no += 1 + continue + + with self._pcap_stop_lock: + if band not in self.pcap_properties: + self.ssh.pull_file(log_path, curr_file) + break + time.sleep(2) # wait before looking for file again + + def get_wifi_scan_results(self): + """Starts a wifi scan on wlan2 interface. + + Returns: + List of dictionaries each representing a found network. + """ + result = self.ssh.run('iw dev %s scan' % SCAN_IFACE) + if result.stderr: + raise PacketCaptureError('Failed to get scan dump') + if not result.stdout: + return [] + return self._parse_scan_results(result.stdout) + + def start_scan_and_find_network(self, ssid): + """Start a wifi scan on wlan2 interface and find network. + + Args: + ssid: SSID of the network. + + Returns: + True/False if the network if found or not. + """ + curr_time = time.time() + while time.time() < curr_time + SCAN_TIMEOUT: + found_networks = self.get_wifi_scan_results() + for network in found_networks: + if network[SSID] == ssid: + return True + time.sleep(3) # sleep before next scan + return False + + def configure_monitor_mode(self, band, channel): + """Configure monitor mode. + + Args: + band: band to configure monitor mode for. + channel: channel to set for the interface. + + Returns: + True if configure successful. + False if not successful. + """ + band = band.upper() + if band not in BAND_IFACE: + self.log.error('Invalid band. Must be 2g/2G or 5g/5G') + return False + + iface = BAND_IFACE[band] + self.ssh.run('iw dev %s set channel %s' % + (iface, channel), ignore_status=True) + result = self.ssh.run('iw dev %s info' % iface, ignore_status=True) + if result.stderr or 'channel %s' % channel not in result.stdout: + self.log.error("Failed to configure monitor mode for %s" % band) + return False + return True + + def start_packet_capture(self, band, log_path, pcap_file): + """Start packet capture for band. + + band = 2G starts tcpdump on 'mon0' interface. + band = 5G starts tcpdump on 'mon1' interface. + + This method splits the pcap file every 50MB for 100 files. + Since, the size of the pcap file could become large, each split file + is moved to log_path once a new file is generated. This ensures that + there is no crash on the onhub router due to lack of space. + + Args: + band: '2g' or '2G' and '5g' or '5G'. + log_path: test log path to save the pcap file. + pcap_file: name of the pcap file. + + Returns: + pid: process id of the tcpdump. + """ + band = band.upper() + if band not in BAND_IFACE.keys() or band in self.pcap_properties: + self.log.error("Invalid band or packet capture already running") + return None + + pcap_dir = self.ssh.run('mktemp -d', ignore_status=True).stdout.rstrip() + self.tmp_dirs.append(pcap_dir) + pcap_file = os.path.join(pcap_dir, "%s_%s.pcap" % (pcap_file, band)) + pcap_log = os.path.join(pcap_dir, "%s.log" % pcap_file) + + cmd = 'tcpdump -i %s -W 100 -C 50 -w %s > %s 2>&1 & echo $!' % ( + BAND_IFACE[band], pcap_file, pcap_log) + result = self.ssh.run(cmd, ignore_status=True) + if not self._check_if_tcpdump_started(pcap_log): + self.log.error("Failed to start packet capture") + return None + + pcap_thread = threading.Thread(target=self._pull_pcap, + args=(band, pcap_file, log_path)) + pcap_thread.start() + + pid = int(result.stdout) + self.pcap_properties[band] = PcapProperties( + pid, pcap_dir, pcap_file, pcap_thread) + return pid + + def stop_packet_capture(self, pid): + """Stop the packet capture. + + Args: + pid: process id of tcpdump to kill. + """ + for key,val in self.pcap_properties.items(): + if val.pid == pid: + break + try: + key + except NameError: + self.log.error("Failed to stop tcpdump. Invalid PID %s" % pid) + return + + pcap_dir = val.pcap_dir + pcap_thread = val.pcap_thread + self.ssh.run('kill %s' % pid, ignore_status=True) + with self._pcap_stop_lock: + del self.pcap_properties[key] + pcap_thread.join() + self.ssh.run('rm -rf %s' % pcap_dir, ignore_status=True) + self.tmp_dirs.remove(pcap_dir) + + def close(self): + """Cleanup. + + Cleans up all the monitor mode interfaces and closes ssh connections. + """ + self._cleanup_interface(MON_2G) + self._cleanup_interface(MON_5G) + for tmp_dir in self.tmp_dirs: + self.ssh.run('rm -rf %s' % tmp_dir, ignore_status=True) + self.ssh.close() diff --git a/acts/framework/acts/controllers/sl4a_lib/__init__.py b/acts/framework/acts/controllers/sl4a_lib/__init__.py index 9727988ed9..9006087305 100644 --- a/acts/framework/acts/controllers/sl4a_lib/__init__.py +++ b/acts/framework/acts/controllers/sl4a_lib/__init__.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # diff --git a/acts/framework/acts/controllers/sl4a_lib/error_reporter.py b/acts/framework/acts/controllers/sl4a_lib/error_reporter.py index 96aea2afbf..40ec38f6ce 100644 --- a/acts/framework/acts/controllers/sl4a_lib/error_reporter.py +++ b/acts/framework/acts/controllers/sl4a_lib/error_reporter.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # diff --git a/acts/framework/acts/controllers/sl4a_lib/event_dispatcher.py b/acts/framework/acts/controllers/sl4a_lib/event_dispatcher.py index 91141c0f81..b1a3ae6561 100644 --- a/acts/framework/acts/controllers/sl4a_lib/event_dispatcher.py +++ b/acts/framework/acts/controllers/sl4a_lib/event_dispatcher.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2016- The Android Open Source Project # @@ -80,7 +80,8 @@ class EventDispatcher: """ while self._started: try: - event_obj = self._rpc_client.eventWait(50000) + # 60000 in ms, timeout in second + event_obj = self._rpc_client.eventWait(60000, timeout=120) except rpc_client.Sl4aConnectionError as e: if self._rpc_client.is_alive: self.log.warning('Closing due to closed session.') diff --git a/acts/framework/acts/controllers/sl4a_lib/rpc_client.py b/acts/framework/acts/controllers/sl4a_lib/rpc_client.py index ee12cef5c3..1bbabc736f 100644 --- a/acts/framework/acts/controllers/sl4a_lib/rpc_client.py +++ b/acts/framework/acts/controllers/sl4a_lib/rpc_client.py @@ -1,18 +1,35 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. import json +import socket import threading - import time from concurrent import futures +from acts import error from acts import logger +# The default timeout value when no timeout is set. SOCKET_TIMEOUT = 60 # The Session UID when a UID has not been received yet. UNKNOWN_UID = -1 -class Sl4aException(Exception): +class Sl4aException(error.ActsError): """The base class for all SL4A exceptions.""" @@ -35,10 +52,14 @@ class Sl4aProtocolError(Sl4aException): MISMATCHED_API_ID = 'Mismatched API id.' -class MissingSl4AError(Sl4aException): +class Sl4aIsMissingError(Sl4aException): """An error raised when an Sl4aClient is created without SL4A installed.""" +class Sl4aRpcTimeoutError(Sl4aException): + """An error raised when an SL4A RPC has timed out.""" + + class RpcClient(object): """An RPC client capable of processing multiple RPCs concurrently. @@ -154,8 +175,8 @@ class RpcClient(object): self._working_connections.append(client) return client - client_count = ( - len(self._free_connections) + len(self._working_connections)) + client_count = (len(self._free_connections) + + len(self._working_connections)) if client_count < self.max_connections: with self._lock: client_count = (len(self._free_connections) + @@ -180,7 +201,7 @@ class RpcClient(object): self._working_connections.remove(connection) self._free_connections.append(connection) - def rpc(self, method, *args, timeout=None, retries=1): + def rpc(self, method, *args, timeout=None, retries=3): """Sends an rpc to sl4a. Sends an rpc call to sl4a over this RpcClient's corresponding session. @@ -200,7 +221,7 @@ class RpcClient(object): """ connection = self._get_free_connection() ticket = connection.get_new_ticket() - + timed_out = False if timeout: connection.set_timeout(timeout) data = {'id': ticket, 'method': method, 'params': args} @@ -209,10 +230,8 @@ class RpcClient(object): try: for i in range(1, retries + 1): connection.send_request(request) - self._log.debug('Sent: %s' % request) response = connection.get_response() - self._log.debug('Received: %s', response) if not response: if i < retries: self._log.warning( @@ -220,6 +239,9 @@ class RpcClient(object): method, i) continue else: + self._log.exception( + 'No response for RPC method %s on iteration %s', + method, i) self.on_error(connection) raise Sl4aProtocolError( Sl4aProtocolError.NO_RESPONSE_FROM_SERVER) @@ -227,22 +249,40 @@ class RpcClient(object): break except BrokenPipeError as e: if self.is_alive: - self._log.error('Exception %s happened while communicating to ' - 'SL4A.', e) + self._log.exception('Exception %s happened for sl4a call %s', + e, method) self.on_error(connection) else: self._log.warning('The connection was killed during cleanup:') self._log.warning(e) raise Sl4aConnectionError(e) + except socket.timeout as err: + # If a socket connection has timed out, the socket can no longer be + # used. Close it out and remove the socket from the connection pool. + timed_out = True + self._log.warning('RPC "%s" (id: %s) timed out after %s seconds.', + method, ticket, timeout or SOCKET_TIMEOUT) + self._log.debug( + 'Closing timed out connection over %s' % connection.ports) + connection.close() + self._working_connections.remove(connection) + # Re-raise the error as an SL4A Error so end users can process it. + raise Sl4aRpcTimeoutError(err) finally: - if timeout: - connection.set_timeout(SOCKET_TIMEOUT) - self._release_working_connection(connection) + if not timed_out: + if timeout: + connection.set_timeout(SOCKET_TIMEOUT) + self._release_working_connection(connection) result = json.loads(str(response, encoding='utf8')) if result['error']: + error_object = result['error'] + if (error_object.get('code', None) and + error_object.get('message', None)): + self._log.error('Received Sl4aError %s:%s' % ( + error_object['code'], error_object['message'])) err_msg = 'RPC call %s to device failed with error %s' % ( - method, result['error']) + method, error_object.get('data', error_object)) self._log.error(err_msg) raise Sl4aApiError(err_msg) if result['id'] != ticket: diff --git a/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py b/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py index 69a7d652f0..387678971b 100644 --- a/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py +++ b/acts/framework/acts/controllers/sl4a_lib/rpc_connection.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # @@ -124,10 +124,12 @@ class RpcConnection(object): """Sends a request over the connection.""" self._socket_file.write(request.encode('utf8') + b'\n') self._socket_file.flush() + self.log.debug('Sent: ' + request) def get_response(self): """Returns the first response sent back to the client.""" data = self._socket_file.readline() + self.log.debug('Received: ' + data.decode('utf8', errors='replace')) return data def close(self): diff --git a/acts/framework/acts/controllers/sl4a_lib/sl4a_manager.py b/acts/framework/acts/controllers/sl4a_lib/sl4a_manager.py index 4b4efeac68..254a605050 100644 --- a/acts/framework/acts/controllers/sl4a_lib/sl4a_manager.py +++ b/acts/framework/acts/controllers/sl4a_lib/sl4a_manager.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # @@ -216,7 +216,7 @@ class Sl4aManager(object): if not self._started: self._started = True if not self.is_sl4a_installed(): - raise rpc_client.MissingSl4AError( + raise rpc_client.Sl4aIsMissingError( 'SL4A is not installed on device %s' % self.adb.serial) if self.adb.shell( 'ps | grep "S com.googlecode.android_scripting"'): @@ -279,6 +279,10 @@ class Sl4aManager(object): self.sessions[session.uid] = session return session + def stop_service(self): + """Stops The SL4A Service.""" + self._started = False + def terminate_all_sessions(self): """Terminates all SL4A sessions gracefully.""" self.error_reporter.finalize_reports() diff --git a/acts/framework/acts/controllers/sl4a_lib/sl4a_ports.py b/acts/framework/acts/controllers/sl4a_lib/sl4a_ports.py index aeb49698c4..5435aed50f 100644 --- a/acts/framework/acts/controllers/sl4a_lib/sl4a_ports.py +++ b/acts/framework/acts/controllers/sl4a_lib/sl4a_ports.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # diff --git a/acts/framework/acts/controllers/sl4a_lib/sl4a_session.py b/acts/framework/acts/controllers/sl4a_lib/sl4a_session.py index 074d9d8591..e48143e937 100644 --- a/acts/framework/acts/controllers/sl4a_lib/sl4a_session.py +++ b/acts/framework/acts/controllers/sl4a_lib/sl4a_session.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2018 - The Android Open Source Project # @@ -19,10 +19,12 @@ import threading import errno from acts import logger +from acts.controllers.adb import AdbError from acts.controllers.sl4a_lib import event_dispatcher from acts.controllers.sl4a_lib import rpc_connection from acts.controllers.sl4a_lib import rpc_client from acts.controllers.sl4a_lib import sl4a_ports +from acts.controllers.sl4a_lib.rpc_client import Sl4aStartError SOCKET_TIMEOUT = 60 @@ -102,6 +104,37 @@ class Sl4aSession(object): def is_alive(self): return not self._terminated + def _create_forwarded_port(self, server_port, hinted_port=0): + """Creates a forwarded port to the specified server port. + + Args: + server_port: (int) The port to forward to. + hinted_port: (int) The port to use for forwarding, if available. + Otherwise, the chosen port will be random. + Returns: + The chosen forwarded port. + + Raises AdbError if the version of ADB is too old, or the command fails. + """ + if self.adb.get_version_number() < 37 and hinted_port == 0: + self.log.error( + 'The current version of ADB does not automatically provide a ' + 'port to forward. Please upgrade ADB to version 1.0.37 or ' + 'higher.') + raise Sl4aStartError('Unable to forward a port to the device.') + else: + try: + return self.adb.tcp_forward(hinted_port, server_port) + except AdbError as e: + if 'cannot bind listener' in e.stderr: + self.log.warning( + 'Unable to use %s to forward to device port %s due to: ' + '"%s". Attempting to choose a random port instead.' % + (hinted_port, server_port, e.stderr)) + # Call this method again, but this time with no hinted port. + return self._create_forwarded_port(server_port) + raise e + def _create_rpc_connection(self, ports=None, uid=UNKNOWN_UID): """Creates an RPC Connection with the specified ports. @@ -123,7 +156,7 @@ class Sl4aSession(object): ports.server_port = self.obtain_server_port(ports.server_port) self.server_port = ports.server_port # Forward the device port to the host. - ports.forwarded_port = int(self.adb.tcp_forward(0, ports.server_port)) + ports.forwarded_port = self._create_forwarded_port(ports.server_port) client_socket, fd = self._create_client_side_connection(ports) client = rpc_connection.RpcConnection( self.adb, ports, client_socket, fd, uid=uid) @@ -199,10 +232,21 @@ class Sl4aSession(object): with self._terminate_lock: if not self._terminated: self.log.debug('Terminating Session.') - self.rpc_client.closeSl4aSession() + try: + self.rpc_client.closeSl4aSession() + except Exception as e: + if "SL4A session has already been terminated" not in str( + e): + self.log.warning(e) # Must be set after closeSl4aSession so the rpc_client does not # think the session has closed. self._terminated = True if self._event_dispatcher: - self._event_dispatcher.close() - self.rpc_client.terminate() + try: + self._event_dispatcher.close() + except Exception as e: + self.log.warning(e) + try: + self.rpc_client.terminate() + except Exception as e: + self.log.warning(e) diff --git a/acts/framework/acts/controllers/sl4a_lib/sl4a_types.py b/acts/framework/acts/controllers/sl4a_lib/sl4a_types.py index 4fac844cff..ec9eb21580 100644 --- a/acts/framework/acts/controllers/sl4a_lib/sl4a_types.py +++ b/acts/framework/acts/controllers/sl4a_lib/sl4a_types.py @@ -1,4 +1,4 @@ -#!/usr/bin/env python3.4 +#!/usr/bin/env python3 # # Copyright 2016 - The Android Open Source Project # diff --git a/acts/framework/acts/controllers/utils_lib/ssh/connection.py b/acts/framework/acts/controllers/utils_lib/ssh/connection.py index b969e00816..b89a08cbbe 100644 --- a/acts/framework/acts/controllers/utils_lib/ssh/connection.py +++ b/acts/framework/acts/controllers/utils_lib/ssh/connection.py @@ -396,6 +396,16 @@ class SshConnection(object): user_host = self._formatter.format_host_name(self._settings) job.run('scp %s %s:%s' % (local_path, user_host, remote_path)) + def pull_file(self, local_path, remote_path): + """Send a file from remote host to local host + + Args: + local_path: string path of file to recv on local host + remote_path: string path to copy file from on remote host. + """ + user_host = self._formatter.format_host_name(self._settings) + job.run('scp %s:%s %s' % (user_host, remote_path, local_path)) + def find_free_port(self, interface_name='localhost'): """Find a unused port on the remote host. diff --git a/acts/framework/acts/error.py b/acts/framework/acts/error.py new file mode 100644 index 0000000000..8f94feb29a --- /dev/null +++ b/acts/framework/acts/error.py @@ -0,0 +1,57 @@ +"""This class is where error information will be stored. +""" + +import json + + +class ActsError(Exception): + """Base Acts Error""" + def __init__(self, *args, **kwargs): + class_name = self.__class__.__name__ + self.message = self.__class__.__doc__ + self.error_code = getattr(ActsErrorCode, class_name) + self.extra = kwargs + if len(args) > 0: + self.extra['details'] = args + + def json_str(self): + """Converts this error to a string in json format. + + Format of the json string is: + { + "ErrorCode": int + "Message": str + "Extra": any + } + + Returns: + A json-format string representing the errors + """ + d = {} + d['ErrorCode'] = self.error_code + d['Message'] = self.message + d['Extras'] = self.extra + json_str = json.dumps(d, indent=4, sort_keys=True) + return json_str + + +class ActsErrorCode: + # Framework Errors 0-999 + + # This error code is used to implement unittests for this class. + ActsError = 100 + AndroidDeviceError = 101 + + # Controllers Errors 1000-3999 + + Sl4aStartError = 1001 + Sl4aApiError = 1002 + Sl4aConnectionError = 1003 + Sl4aProtocolError = 1004 + Sl4aIsMissingError = 1005 + Sl4aRpcTimeoutError = 1006 + + # Util Errors 4000-9999 + + FastbootError = 9000 + AdbError = 9001 diff --git a/acts/framework/acts/keys.py b/acts/framework/acts/keys.py index a0aa52af46..66bfcafd74 100644 --- a/acts/framework/acts/keys.py +++ b/acts/framework/acts/keys.py @@ -48,6 +48,7 @@ class Config(enum.Enum): key_monsoon = "Monsoon" key_sniffer = "Sniffer" key_arduino_wifi_dongle = "ArduinoWifiDongle" + key_packet_capture = "PacketCapture" # Internal keys, used internally, not exposed to user's config files. ikey_user_param = "user_params" ikey_testbed_name = "testbed_name" @@ -66,6 +67,7 @@ class Config(enum.Enum): m_key_packet_sender = "packet_sender" m_key_sniffer = "sniffer" m_key_arduino_wifi_dongle = "arduino_wifi_dongle" + m_key_packet_capture = "packet_capture" # A list of keys whose values in configs should not be passed to test # classes without unpacking first. @@ -84,6 +86,7 @@ class Config(enum.Enum): key_sniffer, key_chameleon_device, key_arduino_wifi_dongle, + key_packet_capture, ] # Keys that are file or folder paths. diff --git a/acts/framework/acts/logger.py b/acts/framework/acts/logger.py index 2f94f83b7a..65b4c42a57 100755 --- a/acts/framework/acts/logger.py +++ b/acts/framework/acts/logger.py @@ -29,9 +29,9 @@ log_line_format = "%(asctime)s.%(msecs).03d %(levelname)s %(message)s" # The micro seconds are added by the format string above, # so the time format does not include ms. log_line_time_format = "%Y-%m-%d %H:%M:%S" -log_line_timestamp_len = 18 +log_line_timestamp_len = 23 -logline_timestamp_re = re.compile("\d\d-\d\d \d\d:\d\d:\d\d.\d\d\d") +logline_timestamp_re = re.compile("\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d.\d\d\d") def _parse_logline_timestamp(t): diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py index 78c5090599..470b7645bb 100644 --- a/acts/framework/acts/test_utils/power/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py @@ -30,6 +30,7 @@ from acts.test_utils.wifi import wifi_power_test_utils as wputils SETTINGS_PAGE = 'am start -n com.android.settings/.Settings' SCROLL_BOTTOM = 'input swipe 0 2000 0 0' UNLOCK_SCREEN = 'input keyevent 82' +SET_BATTERY_LEVEL = 'dumpsys battery set level 100' SCREENON_USB_DISABLE = 'dumpsys battery unplug' RESET_BATTERY_STATS = 'dumpsys batterystats --reset' AOD_OFF = 'settings put secure doze_always_on 0' @@ -252,6 +253,7 @@ class PowerBaseTest(base_test.BaseTestClass): self.dut.adb.shell(ASSIST_GESTURE) self.dut.adb.shell(ASSIST_GESTURE_ALERT) self.dut.adb.shell(ASSIST_GESTURE_WAKE) + self.dut.adb.shell(SET_BATTERY_LEVEL) self.dut.adb.shell(SCREENON_USB_DISABLE) self.dut.adb.shell(UNLOCK_SCREEN) self.dut.adb.shell(SETTINGS_PAGE) @@ -494,3 +496,4 @@ class PowerBaseTest(base_test.BaseTestClass): self.dut.reboot() self.dut.adb.root() self.dut.adb.remount() + self.dut.adb.shell(SET_BATTERY_LEVEL) diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py index 4b6dc8c50e..7db5cb6bc7 100755 --- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py +++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py @@ -44,12 +44,13 @@ class WifiBaseTest(BaseTestClass): for attenuator in self.attenuators: attenuator.set_atten(0) - def get_wpa2_network( + def get_psk_network( self, mirror_ap, reference_networks, hidden=False, same_ssid=False, + security_mode=hostapd_constants.WPA2_STRING, ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, @@ -73,8 +74,8 @@ class WifiBaseTest(BaseTestClass): """ network_dict_2g = {} network_dict_5g = {} - ref_5g_security = hostapd_constants.WPA2_STRING - ref_2g_security = hostapd_constants.WPA2_STRING + ref_5g_security = security_mode + ref_2g_security = security_mode if same_ssid: ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) @@ -90,32 +91,19 @@ class WifiBaseTest(BaseTestClass): ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) - if hidden: - network_dict_2g = { - "SSID": ref_2g_ssid, - "security": ref_2g_security, - "password": ref_2g_passphrase, - "hiddenSSID": True - } - - network_dict_5g = { - "SSID": ref_5g_ssid, - "security": ref_5g_security, - "password": ref_5g_passphrase, - "hiddenSSID": True - } - else: - network_dict_2g = { - "SSID": ref_2g_ssid, - "security": ref_2g_security, - "password": ref_2g_passphrase - } - - network_dict_5g = { - "SSID": ref_5g_ssid, - "security": ref_5g_security, - "password": ref_5g_passphrase - } + network_dict_2g = { + "SSID": ref_2g_ssid, + "security": ref_2g_security, + "password": ref_2g_passphrase, + "hiddenSSID": hidden + } + + network_dict_5g = { + "SSID": ref_5g_ssid, + "security": ref_5g_security, + "password": ref_5g_passphrase, + "hiddenSSID": hidden + } ap = 0 for ap in range(MAX_AP_COUNT): @@ -159,32 +147,91 @@ class WifiBaseTest(BaseTestClass): open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) - if hidden: - network_dict_2g = { + network_dict_2g = { "SSID": open_2g_ssid, "security": 'none', - "hiddenSSID": True - } + "hiddenSSID": hidden + } - network_dict_5g = { + network_dict_5g = { "SSID": open_5g_ssid, "security": 'none', - "hiddenSSID": True - } + "hiddenSSID": hidden + } + + ap = 0 + for ap in range(MAX_AP_COUNT): + open_network.append({ + "2g": copy.copy(network_dict_2g), + "5g": copy.copy(network_dict_5g) + }) + if not mirror_ap: + break + return {"2g": network_dict_2g, "5g": network_dict_5g} + + def get_wep_network( + self, + mirror_ap, + networks, + hidden=False, + same_ssid=False, + ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, + ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, + passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, + passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G): + """Generates SSID and passphrase for a WEP network using random + generator. + + Args: + mirror_ap: Boolean, determines if both APs use the same hostapd + config or different configs. + networks: List of WEP networks. + same_ssid: Boolean, determines if both bands on AP use the same + SSID. + ssid_length_2gecond AP Int, number of characters to use for 2G SSID. + ssid_length_5g: Int, number of characters to use for 5G SSID. + passphrase_length_2g: Int, length of password for 2G network. + passphrase_length_5g: Int, length of password for 5G network. + + Returns: A dict of 2G and 5G network lists for hostapd configuration. + + """ + network_dict_2g = {} + network_dict_5g = {} + ref_5g_security = hostapd_constants.WEP_STRING + ref_2g_security = hostapd_constants.WEP_STRING + + if same_ssid: + ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_5g_ssid = ref_2g_ssid + + ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) + ref_5g_passphrase = ref_2g_passphrase + else: - network_dict_2g = { - "SSID": open_2g_ssid, - "security": 'none' - } + ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_2g_passphrase = utils.rand_hex_str(passphrase_length_2g) - network_dict_5g = { - "SSID": open_5g_ssid, - "security": 'none' - } + ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + ref_5g_passphrase = utils.rand_hex_str(passphrase_length_5g) + + network_dict_2g = { + "SSID": ref_2g_ssid, + "security": ref_2g_security, + "wepKeys": [ref_2g_passphrase] * 4, + "hiddenSSID": hidden + } + + network_dict_5g = { + "SSID": ref_5g_ssid, + "security": ref_5g_security, + "wepKeys": [ref_2g_passphrase] * 4, + "hiddenSSID": hidden + } ap = 0 for ap in range(MAX_AP_COUNT): - open_network.append({ + networks.append({ "2g": copy.copy(network_dict_2g), "5g": copy.copy(network_dict_5g) }) @@ -208,7 +255,15 @@ class WifiBaseTest(BaseTestClass): # TODO:(bamahadev) Change all occurances of reference_networks # to wpa_networks. self.reference_networks[ap_instance][band]["bssid"] = bssid - + if network["security"] == hostapd_constants.WPA_STRING: + self.wpa_networks[ap_instance][band]["bssid"] = bssid + if network["security"] == hostapd_constants.WEP_STRING: + self.wep_networks[ap_instance][band]["bssid"] = bssid + if network["security"] == hostapd_constants.ENT_STRING: + if "bssid" not in self.ent_networks[ap_instance][band]: + self.ent_networks[ap_instance][band]["bssid"] = bssid + else: + self.ent_networks_pwd[ap_instance][band]["bssid"] = bssid if network["security"] == 'none': self.open_network[ap_instance][band]["bssid"] = bssid @@ -251,6 +306,13 @@ class WifiBaseTest(BaseTestClass): hidden=False, same_ssid=False, mirror_ap=True, + wpa_network=False, + wep_network=False, + ent_network=False, + radius_conf_2g=None, + radius_conf_5g=None, + ent_network_pwd=False, + radius_conf_pwd=None, ap_count=1): asserts.assert_true( len(self.user_params["AccessPoint"]) == 2, @@ -271,6 +333,14 @@ class WifiBaseTest(BaseTestClass): self.user_params["reference_networks"] = [] self.user_params["open_network"] = [] + if wpa_network: + self.user_params["wpa_networks"] = [] + if wep_network: + self.user_params["wep_networks"] = [] + if ent_network: + self.user_params["ent_networks"] = [] + if ent_network_pwd: + self.user_params["ent_networks_pwd"] = [] for count in range(config_count): @@ -283,7 +353,7 @@ class WifiBaseTest(BaseTestClass): network_list_2g.append({"channel": channel_2g}) network_list_5g.append({"channel": channel_5g}) - networks_dict = self.get_wpa2_network( + networks_dict = self.get_psk_network( mirror_ap, self.user_params["reference_networks"], hidden=hidden, @@ -309,6 +379,59 @@ class WifiBaseTest(BaseTestClass): network_list_2g.append(networks_dict["2g"]) network_list_5g.append(networks_dict["5g"]) + if wpa_network: + networks_dict = self.get_psk_network( + mirror_ap, + self.user_params["wpa_networks"], + hidden=hidden, + same_ssid=same_ssid, + security_mode=hostapd_constants.WPA_STRING) + self.wpa_networks = self.user_params["wpa_networks"] + + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + + if wep_network: + networks_dict = self.get_wep_network( + mirror_ap, + self.user_params["wep_networks"], + hidden=hidden, + same_ssid=same_ssid) + self.wep_networks = self.user_params["wep_networks"] + + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + + if ent_network: + networks_dict = self.get_open_network( + mirror_ap, + self.user_params["ent_networks"], + hidden=hidden, + same_ssid=same_ssid) + networks_dict["2g"]["security"] = hostapd_constants.ENT_STRING + networks_dict["2g"].update(radius_conf_2g) + networks_dict["5g"]["security"] = hostapd_constants.ENT_STRING + networks_dict["5g"].update(radius_conf_5g) + self.ent_networks = self.user_params["ent_networks"] + + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + + if ent_network_pwd: + networks_dict = self.get_open_network( + mirror_ap, + self.user_params["ent_networks_pwd"], + hidden=hidden, + same_ssid=same_ssid) + networks_dict["2g"]["security"] = hostapd_constants.ENT_STRING + networks_dict["2g"].update(radius_conf_pwd) + networks_dict["5g"]["security"] = hostapd_constants.ENT_STRING + networks_dict["5g"].update(radius_conf_pwd) + self.ent_networks_pwd = self.user_params["ent_networks_pwd"] + + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + orig_network_list_5g = copy.copy(network_list_5g) orig_network_list_2g = copy.copy(network_list_2g) @@ -331,6 +454,8 @@ class WifiBaseTest(BaseTestClass): def _generate_legacy_ap_config(self, network_list): bss_settings = [] + wlan_2g = self.access_points[AP_1].wlan_2g + wlan_5g = self.access_points[AP_1].wlan_5g ap_settings = network_list.pop(0) # TODO:(bmahadev) This is a bug. We should not have to pop the first # network in the list and treat it as a separate case. Instead, @@ -338,47 +463,90 @@ class WifiBaseTest(BaseTestClass): # build config based on the bss_Settings alone. hostapd_config_settings = network_list.pop(0) for network in network_list: - if "password" in network and "hiddenSSID" in network: + if "password" in network: bss_settings.append( hostapd_bss_settings.BssSettings( name=network["SSID"], ssid=network["SSID"], - hidden=True, + hidden=network["hiddenSSID"], security=hostapd_security.Security( security_mode=network["security"], password=network["password"]))) - elif "password" in network and not "hiddenSSID" in network: + elif "wepKeys" in network: bss_settings.append( hostapd_bss_settings.BssSettings( name=network["SSID"], ssid=network["SSID"], + hidden=network["hiddenSSID"], security=hostapd_security.Security( security_mode=network["security"], - password=network["password"]))) - elif not "password" in network and "hiddenSSID" in network: + password=network["wepKeys"][0]))) + elif network["security"] == hostapd_constants.ENT_STRING: bss_settings.append( hostapd_bss_settings.BssSettings( name=network["SSID"], ssid=network["SSID"], - hidden=True)) - elif not "password" in network and not "hiddenSSID" in network: + hidden=network["hiddenSSID"], + security=hostapd_security.Security( + security_mode=network["security"], + radius_server_ip=network["radius_server_ip"], + radius_server_port=network["radius_server_port"], + radius_server_secret=network["radius_server_secret"]))) + else: bss_settings.append( hostapd_bss_settings.BssSettings( name=network["SSID"], - ssid=network["SSID"])) + ssid=network["SSID"], + hidden=network["hiddenSSID"])) if "password" in hostapd_config_settings: config = hostapd_ap_preset.create_ap_preset( + iface_wlan_2g=wlan_2g, + iface_wlan_5g=wlan_5g, channel=ap_settings["channel"], ssid=hostapd_config_settings["SSID"], + hidden=hostapd_config_settings["hiddenSSID"], security=hostapd_security.Security( security_mode=hostapd_config_settings["security"], password=hostapd_config_settings["password"]), - bss_settings=bss_settings, - profile_name='whirlwind') + bss_settings=bss_settings) + elif "wepKeys" in hostapd_config_settings: + config = hostapd_ap_preset.create_ap_preset( + iface_wlan_2g=wlan_2g, + iface_wlan_5g=wlan_5g, + channel=ap_settings["channel"], + ssid=hostapd_config_settings["SSID"], + hidden=hostapd_config_settings["hiddenSSID"], + security=hostapd_security.Security( + security_mode=hostapd_config_settings["security"], + password=hostapd_config_settings["wepKeys"][0]), + bss_settings=bss_settings) else: config = hostapd_ap_preset.create_ap_preset( + iface_wlan_2g=wlan_2g, + iface_wlan_5g=wlan_5g, channel=ap_settings["channel"], ssid=hostapd_config_settings["SSID"], - bss_settings=bss_settings, - profile_name='whirlwind') + hidden=hostapd_config_settings["hiddenSSID"], + bss_settings=bss_settings) return config + + def configure_packet_capture( + self, + channel_5g=hostapd_constants.AP_DEFAULT_CHANNEL_5G, + channel_2g=hostapd_constants.AP_DEFAULT_CHANNEL_2G): + """Configure packet capture for 2G and 5G bands. + + Args: + channel_5g: Channel to set the monitor mode to for 5G band. + channel_2g: Channel to set the monitor mode to for 2G band. + """ + self.packet_capture = self.packet_capture[0] + result = self.packet_capture.configure_monitor_mode( + hostapd_constants.BAND_2G, channel_2g) + if not result: + raise ValueError("Failed to configure channel for 2G band") + + result = self.packet_capture.configure_monitor_mode( + hostapd_constants.BAND_5G, channel_5g) + if not result: + raise ValueError("Failed to configure channel for 5G band.") diff --git a/acts/framework/acts/test_utils/wifi/rpm_controller_utils.py b/acts/framework/acts/test_utils/wifi/rpm_controller_utils.py new file mode 100644 index 0000000000..6aa8e3ec58 --- /dev/null +++ b/acts/framework/acts/test_utils/wifi/rpm_controller_utils.py @@ -0,0 +1,183 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from acts.controllers.attenuator_lib._tnhelper import _ascii_string + +import logging +import telnetlib + +ID = '.A' +LOGIN_PWD = 'admn' +ON = 'On' +OFF = 'Off' +PASSWORD = 'Password: ' +PORT = 23 +RPM_PROMPT = 'Switched CDU: ' +SEPARATOR = '\n' +TIMEOUT = 3 +USERNAME = 'Username: ' + + +class RpmControllerError(Exception): + """Error related to RPM switch.""" + +class RpmController(object): + """Class representing telnet to RPM switch. + + Each object represents a telnet connection to the RPM switch's IP. + + Attributes: + tn: represents a connection to RPM switch. + host: IP address of the RPM controller. + """ + def __init__(self, host): + """Initializes the RPM controller object. + + Establishes a telnet connection and login to the switch. + """ + self.host = host + logging.info('RPM IP: %s' % self.host) + + self.tn = telnetlib.Telnet(self.host) + self.tn.open(self.host, PORT, TIMEOUT) + self.run(USERNAME, LOGIN_PWD) + result = self.run(PASSWORD, LOGIN_PWD) + if RPM_PROMPT not in result: + raise RpmControllerError('Failed to login to rpm controller %s' + % self.host) + + def run(self, prompt, cmd_str): + """Method to run commands on the RPM. + + This method simply runs a command and returns output in decoded format. + The calling methods should take care of parsing the expected result + from this output. + + Args: + prompt: Expected prompt before running a command. + cmd_str: Command to run on RPM. + + Returns: + Decoded text returned by the command. + """ + cmd_str = '%s%s' % (cmd_str, SEPARATOR) + res = self.tn.read_until(_ascii_string(prompt), TIMEOUT) + + self.tn.write(_ascii_string(cmd_str)) + idx, val, txt = self.tn.expect( + [_ascii_string('\S+%s' % SEPARATOR)], TIMEOUT) + + return txt.decode() + + def set_rpm_port_state(self, rpm_port, state): + """Method to turn on/off rpm port. + + Args: + rpm_port: port number of the switch to turn on. + state: 'on' or 'off' + + Returns: + True: if the state is set to the expected value + """ + port = '%s%s' % (ID, rpm_port) + logging.info('Turning %s port: %s' % (state, port)) + self.run(RPM_PROMPT, '%s %s' % (state.lower(), port)) + result = self.run(RPM_PROMPT, 'status %s' % port) + if port not in result: + raise RpmControllerError('Port %s doesn\'t exist' % port) + return state in result + + def turn_on(self, rpm_port): + """Method to turn on a port on the RPM switch. + + Args: + rpm_port: port number of the switch to turn on. + + Returns: + True if the port is turned on. + False if not turned on. + """ + return self.set_rpm_port_state(rpm_port, ON) + + def turn_off(self, rpm_port): + """Method to turn off a port on the RPM switch. + + Args: + rpm_port: port number of the switch to turn off. + + Returns: + True if the port is turned off. + False if not turned off. + """ + return self.set_rpm_port_state(rpm_port, OFF) + + def __del__(self): + """Close the telnet connection. """ + self.tn.close() + + +def create_telnet_session(ip): + """Returns telnet connection object to RPM's IP.""" + return RpmController(ip) + +def turn_on_ap(pcap, ssid, rpm_port, rpm_ip=None, rpm=None): + """Turn on the AP. + + This method turns on the RPM port the AP is connected to, + verify the SSID of the AP is found in the scan result through the + packet capturer. + + Either IP addr of the RPM switch or the existing telnet connection + to the RPM is required. Multiple APs might be connected to the same RPM + switch. Instead of connecting/terminating telnet for each AP, the test + can maintain a single telnet connection for all the APs. + + Args: + pcap: packet capture object. + ssid: SSID of the wifi network. + rpm_port: Port number on the RPM switch the AP is connected to. + rpm_ip: IP address of the RPM switch. + rpm: telnet connection object to the RPM switch. + """ + if not rpm and not rpm_ip: + logging.error("Failed to turn on AP. Need telnet object or RPM IP") + return False + elif not rpm: + rpm = create_telnet_session(rpm_ip) + + return rpm.turn_on(rpm_port) and pcap.start_scan_and_find_network(ssid) + +def turn_off_ap(rpm_port, rpm_ip=None, rpm=None): + """ Turn off AP. + + This method turns off the RPM port the AP is connected to. + + Either IP addr of the RPM switch or the existing telnet connection + to the RPM is required. + + Args: + rpm_port: Port number on the RPM switch the AP is connected to. + rpm_ip: IP address of the RPM switch. + rpm: telnet connection object to the RPM switch. + """ + if not rpm and not rpm_ip: + logging.error("Failed to turn off AP. Need telnet object or RPM IP") + return False + elif not rpm: + rpm = create_telnet_session(rpm_ip) + + return rpm.turn_off(rpm_port) diff --git a/acts/framework/acts/test_utils/wifi/wifi_constants.py b/acts/framework/acts/test_utils/wifi/wifi_constants.py index 9494057325..6fa014ec05 100644 --- a/acts/framework/acts/test_utils/wifi/wifi_constants.py +++ b/acts/framework/acts/test_utils/wifi/wifi_constants.py @@ -29,3 +29,8 @@ CONNECT_BY_NETID_SUCCESS = 'WifiManagerConnectByNetIdOnSuccess' AP_MAIN = "main_AP" AP_AUX = "aux_AP" SSID = "SSID" + +# cnss_diag property related constants +DEVICES_USING_LEGACY_PROP = ["sailfish", "marlin", "walleye", "taimen", "muskie"] +CNSS_DIAG_PROP = "persist.vendor.sys.cnss.diag_txt" +LEGACY_CNSS_DIAG_PROP = "persist.sys.cnss.diag_txt" diff --git a/acts/framework/acts/test_utils/wifi/wifi_datastore_utils.py b/acts/framework/acts/test_utils/wifi/wifi_datastore_utils.py new file mode 100755 index 0000000000..3c045db8b6 --- /dev/null +++ b/acts/framework/acts/test_utils/wifi/wifi_datastore_utils.py @@ -0,0 +1,149 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import pprint +import requests +import time + +from acts import asserts +from acts import signals +from acts import utils +from acts.test_utils.wifi import wifi_constants + +"""This file consists of all the helper methods needed to interact with the + Datastore @ https://chaos-188802.appspot.com/ used for Android Interop + testing. +""" + +DATASTORE_HOST = "https://chaos-188802.appspot.com" + +# The Datastore defines the following paths for operating methods. +ADD_DEVICE = "devices/new" +REMOVE_DEVICE = "devices/delete" +LOCK_DEVICE = "devices/lock" +UNLOCK_DEVICE = "devices/unlock" +SHOW_DEVICE = "devices/" +GET_DEVICES = "devices/" + +# HTTP content type. JSON encoded with UTF-8 character encoding. +HTTP_HEADER = {'content-type': 'application/json'} + +def add_device(name, ap_label, lab_label): + """Add a device(AP or Packet Capturer) in datastore. + + Args: + name: string, hostname of the device. + ap_label: string, AP brand name. + lab_label: string, lab label for AP. + Returns: + True if device was added successfully; 0 otherwise. + """ + request = DATASTORE_HOST + '/' + ADD_DEVICE + logging.debug("Request = %s" % request) + response = requests.post(request, + headers=HTTP_HEADER, + data=json.dumps({"hostname":name, + "ap_label":ap_label, + "lab_label":lab_label})) + if response.json()['result'] == 'success': + logging.info("Added device %s to datastore" % name) + return True + return False + +def remove_device(name): + """Delete a device(AP or Packet Capturer) in datastore. + + Args: + name: string, hostname of the device to delete. + Returns: + True if device was deleted successfully; 0 otherwise. + """ + request = DATASTORE_HOST + '/' + REMOVE_DEVICE + logging.debug("Request = %s" % request) + response = requests.put(request, + headers=HTTP_HEADER, + data=json.dumps({"hostname":name})) + result_str = "%s deleted." % name + if result_str in response.text: + logging.info("Removed device %s from datastore" % name) + return True + return False + +def lock_device(name): + """Lock a device(AP or Packet Capturer) in datastore. + + Args: + name: string, hostname of the device in datastore. + Returns: + True if operation was successful; 0 otherwise. + """ + request = DATASTORE_HOST + '/' + LOCK_DEVICE + logging.debug("Request = %s" % request) + response = requests.put(request, + headers=HTTP_HEADER, + data=json.dumps({"hostname":name, "locked_by":"admin"})) + if response.json()['result']: + logging.info("Locked device %s in datastore" % name) + return True + return False + +def unlock_device(name): + """Un-lock a device(AP or Packet Capturer) in datastore. + + Args: + name: string, hostname of the device in datastore. + Returns: + True if operation was successful; 0 otherwise. + """ + request = DATASTORE_HOST + '/' + UNLOCK_DEVICE + logging.debug("Request = %s" % request) + response = requests.put(request, + headers=HTTP_HEADER, + data=json.dumps({"hostname":name})) + if response.json()['result']: + logging.info("Finished un-locking AP %s in datastore" % name) + return True + return False + +def show_device(name): + """Show device properties for a given device(AP or Packet Capturer). + + Args: + name: string, hostname of the device in datastore to fetch info. + Returns: dict of device name:value properties if successful; + None otherwise. + """ + request = DATASTORE_HOST + '/' + SHOW_DEVICE + name + logging.debug("Request = %s" % request) + response = requests.get(request) + if 'null' in response.text: + return None + return response.json() + +def get_devices(): + """Get a list of all devices in the datastore. + + Returns: dict of all devices' name:value properties if successful; + None otherwise. + """ + request = DATASTORE_HOST + '/' + GET_DEVICES + logging.debug("Request = %s" % request) + response = requests.get(request) + if 'error' in response.text: + return None + return response.json() diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py index b7b9fe861b..e0abdf7a87 100644 --- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py @@ -206,6 +206,7 @@ def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=10): ad.reboot() # Wait for auto-wifi feature to start time.sleep(20) + ad.adb.shell('dumpsys battery set level 100') ad.log.info('DTIM updated and device back from reboot') return 1 diff --git a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py index c72cefeefb..6efa6af248 100644 --- a/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py +++ b/acts/framework/acts/test_utils/wifi/wifi_retail_ap.py @@ -13,11 +13,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import fcntl -import logging import selenium import splinter import time +from acts import logger +from acts.controllers import access_point +from acts.controllers.ap_lib import bridge_interface +from acts.controllers.ap_lib import hostapd_security +from acts.controllers.ap_lib import hostapd_ap_preset BROWSER_WAIT_SHORT = 1 BROWSER_WAIT_MED = 3 @@ -36,7 +41,8 @@ def create(configs): ("Netgear", "R7000"): "NetgearR7000AP", ("Netgear", "R7500"): "NetgearR7500AP", ("Netgear", "R7800"): "NetgearR7800AP", - ("Netgear", "R8000"): "NetgearR8000AP" + ("Netgear", "R8000"): "NetgearR8000AP", + ("Google", "Wifi"): "GoogleWifiAP" } objs = [] for config in configs: @@ -70,6 +76,7 @@ class BlockingBrowser(splinter.driver.webdriver.chrome.WebDriver): headless: boolean to control visible/headless browser operation timeout: maximum time allowed to launch browser """ + self.log = logger.create_tagged_trace_logger("ChromeDriver") self.chrome_options = splinter.driver.webdriver.chrome.Options() self.chrome_options.add_argument("--no-proxy-server") self.chrome_options.add_argument("--no-sandbox") @@ -119,7 +126,11 @@ class BlockingBrowser(splinter.driver.webdriver.chrome.WebDriver): self.quit() self.__enter__() - def visit_persistent(self, url, page_load_timeout, num_tries): + def visit_persistent(self, + url, + page_load_timeout, + num_tries, + backup_url="about:blank"): """Method to visit webpages and retry upon failure. The function visits a web page and checks the the resulting URL matches @@ -129,20 +140,22 @@ class BlockingBrowser(splinter.driver.webdriver.chrome.WebDriver): url: the intended url page_load_timeout: timeout for page visits num_tries: number of tries before url is declared unreachable + backup_url: url to visit if first url is not reachable. This can be + use to simply refresh the browser and try again or to re-login to + the AP """ self.driver.set_page_load_timeout(page_load_timeout) for idx in range(num_tries): try: self.visit(url) + if self.url.split("/")[-1] == url.split("/")[-1]: + break + else: + self.visit(backup_url) except: - try: - self.visit("about:blank") - except: - self.restart() - if self.url.split("/")[-1] == url.split("/")[-1]: - break + self.restart() if idx == num_tries - 1: - logging.error("URL unreachable. Current URL: {}".format( + self.log.error("URL unreachable. Current URL: {}".format( self.url)) raise RuntimeError("URL unreachable.") @@ -181,7 +194,7 @@ class WifiRetailAP(object): assumed_ap_settings = self.ap_settings.copy() actual_ap_settings = self.read_ap_settings() if assumed_ap_settings != actual_ap_settings: - logging.warning( + self.log.warning( "Discrepancy in AP settings. Some settings may have been overwritten." ) @@ -203,7 +216,7 @@ class WifiRetailAP(object): Args: region: string indicating AP region """ - logging.warning("Updating region may overwrite wireless settings.") + self.log.warning("Updating region may overwrite wireless settings.") setting_to_update = {"region": region} self.update_ap_settings(setting_to_update) @@ -276,7 +289,15 @@ class WifiRetailAP(object): } self.update_ap_settings(setting_to_update) - def update_ap_settings(self, *dict_settings, **named_settings): + def set_rate(self): + """Function that configures rate used by AP. + + Function implementation is not supported by most APs and thus base + class raises exception if function not implemented in child class. + """ + raise NotImplementedError + + def update_ap_settings(self, dict_settings={}, **named_settings): """Function to update settings of existing AP. Function copies arguments into ap_settings and calls configure_retail_ap @@ -287,28 +308,23 @@ class WifiRetailAP(object): **named_settings accepts named settings to update Note: dict and named_settings cannot contain the same settings. """ - settings_to_update = {} - if (len(dict_settings) == 1) and (type(dict_settings[0]) == dict): - for key, value in dict_settings[0].items(): - if key in named_settings: - raise KeyError("{} was passed twice.".format(key)) - else: - settings_to_update[key] = value - elif len(dict_settings) > 1: - raise TypeError("Wrong number of positional arguments given") - return - - for key, value in named_settings.items(): - settings_to_update[key] = value + settings_to_update = dict(dict_settings, **named_settings) + if len(settings_to_update) != len(dict_settings) + len(named_settings): + raise KeyError("The following keys were passed twice: {}".format( + (set(dict_settings.keys()).intersection( + set(named_settings.keys()))))) + if not set(settings_to_update.keys()).issubset( + set(self.ap_settings.keys())): + raise KeyError( + "The following settings are invalid for this AP: {}".format( + set(settings_to_update.keys()).difference( + set(self.ap_settings.keys())))) updates_requested = False - for key, value in settings_to_update.items(): - if (key in self.ap_settings): - if self.ap_settings[key] != value: - self.ap_settings[key] = value - updates_requested = True - else: - raise KeyError("Invalid setting passed to AP configuration.") + for setting, value in settings_to_update.items(): + if self.ap_settings[setting] != value: + self.ap_settings[setting] = value + updates_requested = True if updates_requested: self.configure_ap() @@ -332,6 +348,8 @@ class NetgearR7000AP(WifiRetailAP): def __init__(self, ap_settings): self.ap_settings = ap_settings.copy() + self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format( + self.ap_settings["ip_address"])) self.init_gui_data() # Read and update AP settings self.read_ap_settings() @@ -349,8 +367,9 @@ class NetgearR7000AP(WifiRetailAP): self.config_page_nologin = "{}://{}:{}/WLG_wireless_dual_band_r10.htm".format( self.ap_settings["protocol"], self.ap_settings["ip_address"], self.ap_settings["port"]) - self.config_page_advanced = "{}://{}:{}/WLG_adv_dual_band2.htm".format( - self.ap_settings["protocol"], self.ap_settings["ip_address"], + self.config_page_advanced = "{}://{}:{}@{}:{}/WLG_adv_dual_band2.htm".format( + self.ap_settings["protocol"], self.ap_settings["admin_username"], + self.ap_settings["admin_password"], self.ap_settings["ip_address"], self.ap_settings["port"]) self.networks = ["2G", "5G_1"] self.channel_band_map = { @@ -426,8 +445,6 @@ class NetgearR7000AP(WifiRetailAP): 600) as browser: # Visit URL browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) - browser.visit_persistent(self.config_page_nologin, - BROWSER_WAIT_MED, 10) for key, value in self.config_page_fields.items(): if "status" in key: @@ -436,7 +453,7 @@ class NetgearR7000AP(WifiRetailAP): config_item = browser.find_by_name(value) self.ap_settings["{}_{}".format(key[1], key[0])] = int( config_item.first.checked) - browser.visit_persistent(self.config_page_nologin, + browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) else: config_item = browser.find_by_name(value) @@ -471,7 +488,7 @@ class NetgearR7000AP(WifiRetailAP): # Visit URL browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) browser.visit_persistent(self.config_page_nologin, - BROWSER_WAIT_MED, 10) + BROWSER_WAIT_MED, 10, self.config_page) # Update region, and power/bandwidth for each network for key, value in self.config_page_fields.items(): @@ -489,7 +506,7 @@ class NetgearR7000AP(WifiRetailAP): self.bw_mode_text[self.ap_settings["{}_{}".format( key[1], key[0])]]) except AttributeError: - logging.warning( + self.log.warning( "Cannot select bandwidth. Keeping AP default.") # Update security settings (passwords updated only if applicable) @@ -522,7 +539,7 @@ class NetgearR7000AP(WifiRetailAP): key[1], key[0])]) time.sleep(BROWSER_WAIT_SHORT) except AttributeError: - logging.warning( + self.log.warning( "Cannot select channel. Keeping AP default.") try: alert = browser.get_alert() @@ -578,6 +595,8 @@ class NetgearR7500AP(WifiRetailAP): def __init__(self, ap_settings): self.ap_settings = ap_settings.copy() + self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format( + self.ap_settings["ip_address"])) self.init_gui_data() # Read and update AP settings self.read_ap_settings() @@ -592,11 +611,9 @@ class NetgearR7500AP(WifiRetailAP): self.ap_settings["protocol"], self.ap_settings["admin_username"], self.ap_settings["admin_password"], self.ap_settings["ip_address"], self.ap_settings["port"]) - self.config_page_nologin = "{}://{}:{}/index.htm".format( - self.ap_settings["protocol"], self.ap_settings["ip_address"], - self.ap_settings["port"]) - self.config_page_advanced = "{}://{}:{}/adv_index.htm".format( - self.ap_settings["protocol"], self.ap_settings["ip_address"], + self.config_page_advanced = "{}://{}:{}@{}:{}/adv_index.htm".format( + self.ap_settings["protocol"], self.ap_settings["admin_username"], + self.ap_settings["admin_password"], self.ap_settings["ip_address"], self.ap_settings["port"]) self.networks = ["2G", "5G_1"] self.channel_band_map = { @@ -747,7 +764,7 @@ class NetgearR7500AP(WifiRetailAP): try: config_item.select_by_text(channel_string) except AttributeError: - logging.warning( + self.log.warning( "Cannot select channel. Keeping AP default.") elif key == ("2G", "bandwidth"): config_item = iframe.find_by_name(value).first @@ -756,7 +773,7 @@ class NetgearR7500AP(WifiRetailAP): str(self.bw_mode_text_2g[self.ap_settings[ "{}_{}".format(key[1], key[0])]])) except AttributeError: - logging.warning( + self.log.warning( "Cannot select bandwidth. Keeping AP default.") elif key == ("5G_1", "bandwidth"): config_item = iframe.find_by_name(value).first @@ -765,7 +782,7 @@ class NetgearR7500AP(WifiRetailAP): str(self.bw_mode_text_5g[self.ap_settings[ "{}_{}".format(key[1], key[0])]])) except AttributeError: - logging.warning( + self.log.warning( "Cannot select bandwidth. Keeping AP default.") # Update passwords for WPA2-PSK protected networks # (Must be done after security type is selected) @@ -807,7 +824,7 @@ class NetgearR7500AP(WifiRetailAP): browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) browser.visit_persistent(self.config_page_advanced, BROWSER_WAIT_MED, 10) - time.sleep(BROWSER_WAIT_SHORT) + time.sleep(BROWSER_WAIT_MED) wireless_button = browser.find_by_id("advanced_bt").first wireless_button.click() time.sleep(BROWSER_WAIT_SHORT) @@ -845,6 +862,7 @@ class NetgearR7500AP(WifiRetailAP): browser.visit_persistent(self.config_page, BROWSER_WAIT_MED, 10) browser.visit_persistent(self.config_page_advanced, BROWSER_WAIT_MED, 10) + time.sleep(BROWSER_WAIT_MED) wireless_button = browser.find_by_id("advanced_bt").first wireless_button.click() time.sleep(BROWSER_WAIT_SHORT) @@ -870,6 +888,8 @@ class NetgearR7800AP(NetgearR7500AP): def __init__(self, ap_settings): self.ap_settings = ap_settings.copy() + self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format( + self.ap_settings["ip_address"])) self.init_gui_data() # Overwrite minor differences from R7500 AP self.bw_mode_text_2g["VHT20"] = "Up to 347 Mbps" @@ -890,6 +910,8 @@ class NetgearR8000AP(NetgearR7000AP): def __init__(self, ap_settings): self.ap_settings = ap_settings.copy() + self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format( + self.ap_settings["ip_address"])) self.init_gui_data() # Overwrite minor differences from R7000 AP self.config_page = "{}://{}:{}@{}:{}/WLG_wireless_dual_band_r8000.htm".format( @@ -899,8 +921,9 @@ class NetgearR8000AP(NetgearR7000AP): self.config_page_nologin = "{}://{}:{}/WLG_wireless_dual_band_r8000.htm".format( self.ap_settings["protocol"], self.ap_settings["ip_address"], self.ap_settings["port"]) - self.config_page_advanced = "{}://{}:{}/WLG_adv_dual_band2_r8000.htm".format( - self.ap_settings["protocol"], self.ap_settings["ip_address"], + self.config_page_advanced = "{}://{}:{}@{}:{}/WLG_adv_dual_band2_r8000.htm".format( + self.ap_settings["protocol"], self.ap_settings["admin_username"], + self.ap_settings["admin_password"], self.ap_settings["ip_address"], self.ap_settings["port"]) self.networks = ["2G", "5G_1", "5G_2"] self.channel_band_map = { @@ -935,3 +958,221 @@ class NetgearR8000AP(NetgearR7000AP): return else: self.update_ap_settings(ap_settings) + + +class GoogleWifiAP(WifiRetailAP): + """ Class that implements Google Wifi AP. + + This class is a work in progress + """ + + def __init__(self, ap_settings): + self.ap_settings = ap_settings.copy() + self.log = logger.create_tagged_trace_logger("AccessPoint|{}".format( + self.ap_settings["ssh_config"]["host"])) + if self.ap_settings["status_2G"] and self.ap_settings["status_5G_1"]: + raise ValueError("Error initializing Google Wifi AP. " + "Only one interface can be enabled at a time.") + self.channel_band_map = { + "2G": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11], + "5G_1": [36, 40, 44, 48, 149, 153, 157, 161, 165] + } + self.BW_MODE_MAP = { + "legacy": 20, + "VHT20": 20, + "VHT40": 40, + "VHT80": 80 + } + self.default_settings = { + "region": "United States", + "brand": "Google", + "model": "Wifi", + "status_2G": 0, + "status_5G_1": 0, + "ssid_2G": "GoogleWifi_2G", + "ssid_5G_1": "GoogleWifi_5G", + "channel_2G": 11, + "channel_5G_1": 149, + "bandwidth_2G": "VHT20", + "bandwidth_5G_1": "VHT20", + "power_2G": "auto", + "power_5G_1": "auto", + "security_type_2G": "Open", + "security_type_5G_1": "Open", + "subnet_2G": "192.168.1.0/24", + "subnet_5G_1": "192.168.9.0/24", + "password_2G": "password", + "password_5G_1": "password" + } + + for setting in self.default_settings.keys(): + if setting not in self.ap_settings: + self.log.warning( + "{0} not found during init. Setting {0} = {1}".format( + setting, self.default_settings[setting])) + self.ap_settings[setting] = self.default_settings[setting] + init_settings = self.ap_settings.copy() + init_settings["ap_subnet"] = { + "2g": self.ap_settings["subnet_2G"], + "5g": self.ap_settings["subnet_5G_1"] + } + self.access_point = access_point.AccessPoint(init_settings) + self.configure_ap() + + def read_ap_settings(self): + """Function that reads current ap settings.""" + return self.ap_settings.copy() + + def update_ap_settings(self, dict_settings={}, **named_settings): + """Function to update settings of existing AP. + + Function copies arguments into ap_settings and calls configure_ap + to apply them. + + Args: + dict_settings: single dictionary of settings to update + **named_settings: named settings to update + Note: dict and named_settings cannot contain the same settings. + """ + settings_to_update = dict(dict_settings, **named_settings) + if len(settings_to_update) != len(dict_settings) + len(named_settings): + raise KeyError("The following keys were passed twice: {}".format( + (set(dict_settings.keys()).intersection( + set(named_settings.keys()))))) + if not set(settings_to_update.keys()).issubset( + set(self.ap_settings.keys())): + raise KeyError( + "The following settings are invalid for this AP: {}".format( + set(settings_to_update.keys()).difference( + set(self.ap_settings.keys())))) + + updating_2G = any(["2G" in x for x in settings_to_update.keys()]) + updating_5G_1 = any(["5G_1" in x for x in settings_to_update.keys()]) + if updating_2G and updating_5G_1: + raise ValueError( + "Error updating Google WiFi AP. " + "One interface can be activated and updated at a time") + elif updating_2G: + # If updating an interface and not explicitly setting its status, + # it is assumed that the interface is to be ENABLED and updated + if "status_2G" not in settings_to_update: + settings_to_update["status_2G"] = 1 + settings_to_update["status_5G_1"] = 0 + elif updating_5G_1: + if "status_5G_1" not in settings_to_update: + settings_to_update["status_2G"] = 0 + settings_to_update["status_5G_1"] = 1 + + updates_requested = False + for setting, value in settings_to_update.items(): + if self.ap_settings[setting] != value: + self.ap_settings[setting] = value + updates_requested = True + + if updates_requested: + self.configure_ap() + + def configure_ap(self): + """Function to configure Google Wifi.""" + self.log.info("Stopping Google Wifi interfaces.") + self.access_point.stop_all_aps() + + if self.ap_settings["status_2G"] == 1: + network = "2G" + self.log.info("Bringing up 2.4 GHz network.") + elif self.ap_settings["status_5G_1"] == 1: + network = "5G_1" + self.log.info("Bringing up 5 GHz network.") + else: + return + + bss_settings = [] + ssid = self.ap_settings["ssid_{}".format(network)] + if "WPA" in self.ap_settings["security_type_{}".format(network)]: + password = self.ap_settings["password_{}".format(network)] + security = hostapd_security.Security( + security_mode="wpa", password=password) + else: + security = hostapd_security.Security( + security_mode=None, password=None) + channel = int(self.ap_settings["channel_{}".format(network)]) + bandwidth = self.BW_MODE_MAP[self.ap_settings["bandwidth_{}".format( + network)]] + config = hostapd_ap_preset.create_ap_preset( + channel=channel, + ssid=ssid, + security=security, + bss_settings=bss_settings, + vht_bandwidth=bandwidth, + profile_name='whirlwind', + iface_wlan_2g=self.access_point.wlan_2g, + iface_wlan_5g=self.access_point.wlan_5g) + config_bridge = self.access_point.generate_bridge_configs(channel) + brconfigs = bridge_interface.BridgeInterfaceConfigs( + config_bridge[0], config_bridge[1], config_bridge[2]) + self.access_point.bridge.startup(brconfigs) + self.access_point.start_ap(config) + self.set_power(network, self.ap_settings["power_{}".format(network)]) + self.log.info("AP started on channel {} with SSID {}".format( + channel, ssid)) + + def set_power(self, network, power): + """Function that sets network transmit power. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + power: power level in dBm + """ + if power == "auto": + power_string = "auto" + else: + if not float(power).is_integer(): + self.log.info( + "Power in dBm must be an integer. Setting to {}".format( + int(power))) + power = int(power) + power_string = "fixed {}".format(int(power) * 100) + + if "2G" in network: + interface = self.access_point.wlan_2g + self.ap_settings["power_2G"] = power + elif "5G_1" in network: + interface = self.access_point.wlan_5g + self.ap_settings["power_5G_1"] = power + self.access_point.ssh.run("iw dev {} set txpower {}".format( + interface, power_string)) + + def set_rate(self, + network, + mode=None, + num_streams=None, + rate=None, + short_gi=0): + """Function that sets rate. + + Args: + network: string containing network identifier (2G, 5G_1, 5G_2) + mode: string indicating the WiFi standard to use + num_streams: number of MIMO streams. used only for VHT + rate: data rate of MCS index to use + short_gi: boolean controlling the use of short guard interval + """ + if "2G" in network: + interface = self.access_point.wlan_2g + interface_short = "2.4" + elif "5G_1" in network: + interface = self.access_point.wlan_5g + interface_short = "5" + + if "legacy" in mode.lower(): + cmd_string = "iw dev {0} set bitrates legacy-{1} {2} ht-mcs-{1} vht-mcs-{1}".format( + interface, interface_short, rate) + elif "vht" in mode.lower(): + cmd_string = "iw dev {0} set bitrates legacy-{1} ht-mcs-{1} vht-mcs-{1} {2}:{3}".format( + interface, interface_short, num_streams, rate) + if short_gi: + cmd_string = cmd_string + " sgi-interface_short" + elif "ht" in mode.lower(): + cmd_string = "iw dev {0} set bitrates legacy-{1} ht-mcs-{1} {2} vht-mcs-{1}".format( + interface, interface_short, rate) + self.access_point.ssh.run(cmd_string) diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py index 0c96fed835..0f86c7e611 100755 --- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py @@ -15,8 +15,9 @@ # limitations under the License. import logging -import time +import os import pprint +import time from enum import IntEnum from queue import Empty @@ -25,6 +26,8 @@ from acts import asserts from acts import signals from acts import utils from acts.controllers import attenuator +from acts.controllers.ap_lib.hostapd_constants import BAND_2G +from acts.controllers.ap_lib.hostapd_constants import BAND_5G from acts.test_utils.wifi import wifi_constants from acts.test_utils.tel import tel_defines @@ -1678,21 +1681,13 @@ def trigger_roaming_and_validate(dut, attenuator, attn_val_name, expected_con): set_attns(attenuator, attn_val_name) logging.info("Wait %ss for roaming to finish.", ROAMING_TIMEOUT) time.sleep(ROAMING_TIMEOUT) - try: - # Wakeup device and verify connection. - dut.droid.wakeLockAcquireBright() - dut.droid.wakeUpNow() - cur_con = dut.droid.wifiGetConnectionInfo() - verify_wifi_connection_info(dut, expected_con) - expected_bssid = expected_con[WifiEnums.BSSID_KEY] - logging.info("Roamed to %s successfully", expected_bssid) - if not validate_connection(dut): - raise signals.TestFailure("Fail to connect to internet on %s" % - expected_ssid) - finally: - dut.droid.wifiLockRelease() - dut.droid.goToSleepNow() + verify_wifi_connection_info(dut, expected_con) + expected_bssid = expected_con[WifiEnums.BSSID_KEY] + logging.info("Roamed to %s successfully", expected_bssid) + if not validate_connection(dut): + raise signals.TestFailure("Fail to connect to internet on %s" % + expected_bssid) def create_softap_config(): """Create a softap config with random ssid and password.""" @@ -1704,3 +1699,94 @@ def create_softap_config(): WifiEnums.PWD_KEY: ap_password, } return config + +def start_pcap(pcap, wifi_band, log_path, test_name): + """Start packet capture in monitor mode. + + Args: + pcap: packet capture object + wifi_band: '2g' or '5g' or 'dual' + log_path: current test log path + test_name: test name to be used for pcap file name + + Returns: + Dictionary with pid of the tcpdump process as key and log path + of the file name as the value + """ + log_dir = os.path.join(log_path, test_name) + utils.create_dir(log_dir) + if wifi_band == 'dual': + bands = [BAND_2G, BAND_5G] + else: + bands = [wifi_band] + pids = {} + for band in bands: + pid = pcap.start_packet_capture(band, log_dir, test_name) + pids[pid] = os.path.join(log_dir, test_name) + return pids + +def stop_pcap(pcap, pids, test_status=None): + """Stop packet capture in monitor mode. + + Since, the pcap logs in monitor mode can be very large, we will + delete them if they are not required. 'test_status' if True, will delete + the pcap files. If False, we will keep them. + + Args: + pcap: packet capture object + pids: dictionary returned by start_pcap + test_status: status of the test case + """ + for pid, fname in pids.items(): + pcap.stop_packet_capture(pid) + + if test_status: + os.system('rm -rf %s' % os.path.dirname(fname)) + +def start_cnss_diags(ads): + for ad in ads: + start_cnss_diag(ad) + +def start_cnss_diag(ad): + """Start cnss_diag to record extra wifi logs + + Args: + ad: android device object. + """ + if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP: + prop = wifi_constants.LEGACY_CNSS_DIAG_PROP + else: + prop = wifi_constants.CNSS_DIAG_PROP + if ad.adb.getprop(prop) != 'true': + ad.adb.shell("find /data/vendor/wifi/cnss_diag/wlan_logs/ -type f -delete") + ad.adb.shell("setprop %s true" % prop, ignore_status=True) + +def stop_cnss_diags(ads): + for ad in ads: + stop_cnss_diag(ad) + +def stop_cnss_diag(ad): + """Stops cnss_diag + + Args: + ad: android device object. + """ + if ad.model in wifi_constants.DEVICES_USING_LEGACY_PROP: + prop = wifi_constants.LEGACY_CNSS_DIAG_PROP + else: + prop = wifi_constants.CNSS_DIAG_PROP + ad.adb.shell("setprop %s false" % prop, ignore_status=True) + +def get_cnss_diag_log(ad, test_name=""): + """Pulls the cnss_diag logs in the wlan_logs dir + Args: + ad: android device object. + test_name: test case name + """ + logs = ad.get_file_names("/data/vendor/wifi/cnss_diag/wlan_logs/") + if logs: + ad.log.info("Pulling cnss_diag logs %s", logs) + log_path = os.path.join(ad.log_path, test_name, + "CNSS_DIAG_%s" % ad.serial) + utils.create_dir(log_path) + ad.pull_files(logs, log_path) diff --git a/acts/framework/acts/utils.py b/acts/framework/acts/utils.py index e6b943a677..edc38509cb 100755 --- a/acts/framework/acts/utils.py +++ b/acts/framework/acts/utils.py @@ -296,6 +296,19 @@ def rand_ascii_str(length): return ''.join(letters) +def rand_hex_str(length): + """Generates a random string of specified length, composed of hex digits + + Args: + length: The number of characters in the string. + + Returns: + The random string generated. + """ + letters = [random.choice(string.hexdigits) for i in range(length)] + return ''.join(letters) + + # Thead/Process related functions. def concurrent_exec(func, param_list): """Executes a function with different parameters pseudo-concurrently. @@ -466,6 +479,8 @@ def sync_device_time(ad): Args: ad: The android device to sync time on. """ + ad.adb.shell("settings global put auto_time 0", ignore_status=True) + ad.adb.shell("settings global put auto_time_zone 0", ignore_status=True) droid = ad.droid droid.setTimeZone(get_timezone_olson_id()) droid.setTime(get_current_epoch_time()) @@ -812,7 +827,7 @@ def parse_ping_ouput(ad, count, out, loss_tolerance=20): packet_rcvd = int(stats[1].split()[0]) min_packet_xmit_rcvd = (100 - loss_tolerance) * 0.01 - if (packet_loss >= loss_tolerance + if (packet_loss > loss_tolerance or packet_xmit < count * min_packet_xmit_rcvd or packet_rcvd < count * min_packet_xmit_rcvd): ad.log.error( diff --git a/acts/framework/setup.py b/acts/framework/setup.py index 66e705253c..5573275a0b 100755 --- a/acts/framework/setup.py +++ b/acts/framework/setup.py @@ -32,10 +32,11 @@ install_requires = [ 'pyserial', 'shellescape', 'protobuf', + 'requests', 'roman', 'scapy-python3', 'pylibftdi', - 'xlsxwriter' + 'xlsxwriter', ] if sys.version_info < (3, ): diff --git a/acts/tests/google/wifi/WifiChaosTest.py b/acts/tests/google/wifi/WifiChaosTest.py new file mode 100755 index 0000000000..9c7c24f884 --- /dev/null +++ b/acts/tests/google/wifi/WifiChaosTest.py @@ -0,0 +1,241 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import time + +import acts.controllers.packet_capture as packet_capture +import acts.signals as signals +import acts.test_utils.wifi.rpm_controller_utils as rutils +import acts.test_utils.wifi.wifi_datastore_utils as dutils +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.controllers.ap_lib import hostapd_constants +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest + +WifiEnums = wutils.WifiEnums + +WAIT_BEFORE_CONNECTION = 1 +SINGLE_BAND = 1 +DUAL_BAND = 2 + +TIMEOUT = 1 +PING_ADDR = 'www.google.com' + + +class WifiChaosTest(WifiBaseTest): + """ Tests for wifi IOT + + Test Bed Requirement: + * One Android device + * Wi-Fi IOT networks visible to the device + """ + + def __init__(self, configs): + BaseTestClass.__init__(self, configs) + self.generate_interop_tests() + + def generate_interop_testcase(self, base_test, testcase_name, ssid_dict): + """Generates a single test case from the given data. + + Args: + base_test: The base test case function to run. + testcase_name: The name of the test case. + ssid_dict: The information about the network under test. + """ + ssid = testcase_name + test_tracker_uuid = ssid_dict[testcase_name]['uuid'] + hostname = ssid_dict[testcase_name]['host'] + if not testcase_name.startswith('test_'): + testcase_name = 'test_%s' % testcase_name + test_case = test_tracker_info(uuid=test_tracker_uuid)( + lambda: base_test(ssid, hostname)) + setattr(self, testcase_name, test_case) + self.tests.append(testcase_name) + + def generate_interop_tests(self): + for ssid_dict in self.user_params['interop_ssid']: + testcase_name = list(ssid_dict)[0] + self.generate_interop_testcase(self.interop_base_test, + testcase_name, ssid_dict) + + def setup_class(self): + self.dut = self.android_devices[0] + wutils.wifi_test_device_init(self.dut) + # Set country code explicitly to "US". + self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + + asserts.assert_true( + self.lock_pcap(), + "Could not lock a Packet Capture. Aborting Interop test.") + + wutils.wifi_toggle_state(self.dut, True) + + def lock_pcap(self): + """Lock a Packet Capturere to use for the test.""" + + # Get list of devices from the datastore. + locked_pcap = False + devices = dutils.get_devices() + + for device in devices: + + device_name = device['hostname'] + device_type = device['ap_label'] + if device_type == 'PCAP'and dutils.lock_device(device_name): + host = device['ip_address'] + self.log.info("Locked Packet Capture device: %s" % device_name) + locked_pcap = True + break + + elif device_type == 'PCAP': + self.log.warning("Failed to lock %s PCAP.") + + if not locked_pcap: + return False + + pcap_config = {'ssh_config':{'user':'root'} } + pcap_config['ssh_config']['host'] = host + + self.pcap = packet_capture.PacketCapture(pcap_config) + return True + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + + def teardown_test(self): + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + wutils.reset_wifi(self.dut) + + + """Helper Functions""" + + def scan_and_connect_by_id(self, network, net_id): + """Scan for network and connect using network id. + + Args: + net_id: Integer specifying the network id of the network. + + """ + ssid = network[WifiEnums.SSID_KEY] + wutils.start_wifi_connection_scan_and_ensure_network_found(self.dut, + ssid) + wutils.wifi_connect_by_id(self.dut, net_id) + + def run_ping(self, sec): + """Run ping for given number of seconds. + + Args: + sec: Time in seconds to run teh ping traffic. + + """ + self.log.info("Running ping for %d seconds" % sec) + result = self.dut.adb.shell("ping -w %d %s" % (sec, PING_ADDR), + timeout=sec + 1) + self.log.debug("Ping Result = %s" % result) + if "100% packet loss" in result: + raise signals.TestFailure("100% packet loss during ping") + + def run_connect_disconnect(self, network): + """Run connect/disconnect to a given network in loop. + + Args: + network: dict, network information. + + Raises: TestFailure if the network connection fails. + + """ + for attempt in range(1): + # TODO:(bmahadev) Change it to 5 or more attempts later. + try: + begin_time = time.time() + ssid = network[WifiEnums.SSID_KEY] + net_id = self.dut.droid.wifiAddNetwork(network) + asserts.assert_true(net_id != -1, "Add network %s failed" % network) + self.log.info("Connecting to %s" % ssid) + self.scan_and_connect_by_id(network, net_id) + self.run_ping(1) + wutils.wifi_forget_network(self.dut, ssid) + time.sleep(WAIT_BEFORE_CONNECTION) + except: + self.log.error("Connection to %s network failed on the %d " + "attempt." % (ssid, attempt)) + # TODO:(bmahadev) Uncomment after scan issue is fixed. + # self.dut.take_bug_report(ssid, begin_time) + # self.dut.cat_adb_log(ssid, begin_time) + raise signals.TestFailure("Failed to connect to %s" % ssid) + + def interop_base_test(self, ssid, hostname): + """Base test for all the connect-disconnect interop tests. + + Args: + ssid: string, SSID of the network to connect to. + hostname: string, hostname of the AP. + + Steps: + 1. Lock AP in datstore. + 2. Turn on AP on the rpm switch. + 3. Run connect-disconnect in loop. + 4. Turn off AP on the rpm switch. + 5. Unlock AP in datastore. + + """ + network = {} + network['password'] = 'password' + network['SSID'] = ssid + wutils.reset_wifi(self.dut) + + # Lock AP in datastore. + self.log.info("Lock AP in datastore") + if not dutils.lock_device(hostname): + self.log.warning("Failed to lock %s AP. Unlock AP in datastore" + " and try again.") + raise signals.TestFailure("Failed to lock AP") + + ap_info = dutils.show_device(hostname) + + band = SINGLE_BAND + if ('ssid_2g' in ap_info) and ('ssid_5g' in ap_info): + band = DUAL_BAND + + # Get AP RPM attributes and Turn ON AP. + rpm_ip = ap_info['rpm_ip'] + rpm_port = ap_info['rpm_port'] + + rutils.turn_on_ap(self.pcap, ssid, rpm_port, rpm_ip=rpm_ip) + self.log.info("Finished turning ON AP.") + # Experimental to check if 2G connects better. + time.sleep(30) + + self.run_connect_disconnect(network) + + # Un-lock only if it's a single band AP or we are running the last band. + if (band == SINGLE_BAND) or ( + band == DUAL_BAND and hostapd_constants.BAND_5G in \ + sys._getframe().f_code.co_name): + + # Un-Lock AP in datastore. + self.log.debug("Un-lock AP in datastore") + if not dutils.unlock_device(hostname): + self.log.warning("Failed to unlock %s AP. Check AP in datastore.") + + # Turn OFF AP from the RPM port. + rutils.turn_off_ap(rpm_port, rpm_ip) diff --git a/acts/tests/google/wifi/WifiCrashStressTest.py b/acts/tests/google/wifi/WifiCrashStressTest.py new file mode 100755 index 0000000000..73f6460a52 --- /dev/null +++ b/acts/tests/google/wifi/WifiCrashStressTest.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.signals as signals +import acts.test_utils.wifi.wifi_test_utils as wutils +from acts import asserts +from acts import utils +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +from acts.test_utils.tel.tel_test_utils import disable_qxdm_logger + +WifiEnums = wutils.WifiEnums + +class WifiCrashStressTest(WifiBaseTest): + """Crash Tests for wifi stack. + + Test Bed Requirement: + * Two Android device + * One Wi-Fi network visible to the device. + """ + + def __init__(self, controllers): + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + self.dut_client = self.android_devices[1] + wutils.wifi_test_device_init(self.dut) + wutils.wifi_test_device_init(self.dut_client) + if not self.dut.is_apk_installed("com.google.mdstest"): + raise signals.TestSkipClass("mdstest is not installed") + req_params = ["dbs_supported_models", "stress_count"] + opt_param = ["reference_networks"] + self.unpack_userparams( + req_param_names=req_params, opt_param_names=opt_param) + + if "AccessPoint" in self.user_params: + self.legacy_configure_ap_and_start() + + asserts.assert_true( + len(self.reference_networks) > 0, + "Need at least one reference network with psk.") + self.network = self.reference_networks[0]["2g"] + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + wutils.wifi_toggle_state(self.dut, True) + self.dut_client.droid.wakeLockAcquireBright() + self.dut_client.droid.wakeUpNow() + wutils.wifi_toggle_state(self.dut_client, True) + + def teardown_test(self): + if self.dut.droid.wifiIsApEnabled(): + wutils.stop_wifi_tethering(self.dut) + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + wutils.reset_wifi(self.dut) + self.dut_client.droid.wakeLockRelease() + self.dut_client.droid.goToSleepNow() + wutils.reset_wifi(self.dut_client) + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + self.dut.cat_adb_log(test_name, begin_time) + self.dut_client.take_bug_report(test_name, begin_time) + self.dut_client.cat_adb_log(test_name, begin_time) + + def teardown_class(self): + if "AccessPoint" in self.user_params: + del self.user_params["reference_networks"] + + """Helper Functions""" + def trigger_wifi_firmware_crash(self, ad, timeout=30): + pre_timestamp = ad.adb.getprop("vendor.debug.ssrdump.timestamp") + ad.adb.shell( + "setprop persist.vendor.sys.modem.diag.mdlog false", ignore_status=True) + # Legacy pixels use persist.sys.modem.diag.mdlog. + ad.adb.shell( + "setprop persist.sys.modem.diag.mdlog false", ignore_status=True) + disable_qxdm_logger(ad) + cmd = ('am instrument -w -e request "4b 25 03 b0 00" ' + '"com.google.mdstest/com.google.mdstest.instrument.' + 'ModemCommandInstrumentation"') + ad.log.info("Crash wifi firmware by %s", cmd) + ad.adb.shell(cmd, ignore_status=True) + time.sleep(timeout) # sleep time for firmware restart + subsystem = ad.adb.getprop("vendor.debug.ssrdump.subsys") + timestamp = ad.adb.getprop("vendor.debug.ssrdump.timestamp") + asserts.assert_true(timestamp != pre_timestamp, + "SSR didn't happened %s %s" % (subsystem, timestamp)) + + """Tests""" + @test_tracker_info(uuid="") + def test_firmware_crash_wifi_reconnect_stress(self): + """Firmware crash stress test for station mode + + 1. Turn on Wi-Fi and connect to access point + 2. Trigger firmware crash + 3. Check ssr happened + 4. Check dut can connect to access point + 5. Repeat step 2~4 + """ + wutils.wifi_toggle_state(self.dut, True) + wutils.connect_to_wifi_network(self.dut, self.network) + for count in range(self.stress_count): + self.log.info("%s: %d/%d" % + (self.current_test_name, count + 1, self.stress_count)) + wutils.reset_wifi(self.dut) + self.trigger_wifi_firmware_crash(self.dut) + wutils.connect_to_wifi_network(self.dut, self.network) + + @test_tracker_info(uuid="") + def test_firmware_crash_softap_reconnect_stress(self): + """Firmware crash stress test for softap mode + + 1. Turn off dut's Wi-Fi + 2. Turn on dut's hotspot and connected by dut client + 3. Trigger firmware crash + 4. Check ssr happened + 5. Check the connectivity of hotspot's client + 6. Repeat step 3~5 + """ + wutils.wifi_toggle_state(self.dut, False) + # Setup Soft AP + sap_config = wutils.create_softap_config() + wutils.start_wifi_tethering( + self.dut, sap_config[wutils.WifiEnums.SSID_KEY], + sap_config[wutils.WifiEnums.PWD_KEY], wutils.WifiEnums.WIFI_CONFIG_APBAND_2G) + config = { + "SSID": sap_config[wutils.WifiEnums.SSID_KEY], + "password": sap_config[wutils.WifiEnums.PWD_KEY] + } + # DUT client connects to softap + wutils.wifi_toggle_state(self.dut_client, True) + wutils.connect_to_wifi_network(self.dut_client, config, check_connectivity=False) + # Ping the DUT + dut_addr = self.dut.droid.connectivityGetIPv4Addresses("wlan0")[0] + asserts.assert_true( + utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_addr, timeout=20), + "%s ping %s failed" % (self.dut_client.serial, dut_addr)) + wutils.reset_wifi(self.dut_client) + for count in range(self.stress_count): + self.log.info("%s: %d/%d" % + (self.current_test_name, count + 1, self.stress_count)) + # Trigger firmware crash + self.trigger_wifi_firmware_crash(self.dut) + # Connect DUT to Network + wutils.wifi_toggle_state(self.dut_client, True) + wutils.connect_to_wifi_network(self.dut_client, config, check_connectivity=False) + # Ping the DUT + server_addr = self.dut.droid.connectivityGetIPv4Addresses("wlan0")[0] + asserts.assert_true( + utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_addr, timeout=20), + "%s ping %s failed" % (self.dut_client.serial, dut_addr)) + wutils.stop_wifi_tethering(self.dut) + + @test_tracker_info(uuid="") + def test_firmware_crash_concurrent_reconnect_stress(self): + """Firmware crash stress test for concurrent mode + + 1. Turn on dut's Wi-Fi and connect to access point + 2. Turn on dut's hotspot and connected by dut client + 3. Trigger firmware crash + 4. Check ssr happened + 5. Check dut can connect to access point + 6. Check the connectivity of hotspot's client + 7. Repeat step 3~6 + """ + if self.dut.model not in self.dbs_supported_models: + raise signals.TestSkip("%s does not support dual interfaces" % self.dut.model) + + # Connect DUT to Network + wutils.wifi_toggle_state(self.dut, True) + wutils.connect_to_wifi_network(self.dut, self.network) + # Setup Soft AP + sap_config = wutils.create_softap_config() + wutils.start_wifi_tethering( + self.dut, sap_config[wutils.WifiEnums.SSID_KEY], + sap_config[wutils.WifiEnums.PWD_KEY], wutils.WifiEnums.WIFI_CONFIG_APBAND_2G) + config = { + "SSID": sap_config[wutils.WifiEnums.SSID_KEY], + "password": sap_config[wutils.WifiEnums.PWD_KEY] + } + # Client connects to Softap + wutils.wifi_toggle_state(self.dut_client, True) + wutils.connect_to_wifi_network(self.dut_client, config) + wutils.reset_wifi(self.dut_client) + wutils.reset_wifi(self.dut) + for count in range(self.stress_count): + self.log.info("%s: %d/%d" % + (self.current_test_name, count + 1, self.stress_count)) + # Trigger firmware crash + self.trigger_wifi_firmware_crash(self.dut) + wutils.connect_to_wifi_network(self.dut, self.network) + wutils.connect_to_wifi_network(self.dut_client, config) + wutils.stop_wifi_tethering(self.dut) diff --git a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py index abdf2d4e2c..94918c9f7c 100644 --- a/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py +++ b/acts/tests/google/wifi/WifiEnterpriseRoamingTest.py @@ -18,10 +18,10 @@ import random import time from acts import asserts -from acts import base_test from acts import signals from acts.test_decorators import test_tracker_info from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest WifiEnums = wutils.WifiEnums @@ -33,14 +33,14 @@ EapPhase2 = WifiEnums.EapPhase2 Ent = WifiEnums.Enterprise -class WifiEnterpriseRoamingTest(base_test.BaseTestClass): +class WifiEnterpriseRoamingTest(WifiBaseTest): + def __init__(self, controllers): + WifiBaseTest.__init__(self, controllers) + def setup_class(self): self.dut = self.android_devices[0] wutils.wifi_test_device_init(self.dut) req_params = ( - "ent_roaming_ssid", - "bssid_a", - "bssid_b", "attn_vals", # Expected time within which roaming should finish, in seconds. "roam_interval", @@ -49,8 +49,25 @@ class WifiEnterpriseRoamingTest(base_test.BaseTestClass): "client_key", "eap_identity", "eap_password", - "device_password") + "device_password", + "radius_conf_2g", + "radius_conf_5g") self.unpack_userparams(req_params) + if "AccessPoint" in self.user_params: + self.legacy_configure_ap_and_start( + mirror_ap=True, + ent_network=True, + ap_count=2, + radius_conf_2g=self.radius_conf_2g, + radius_conf_5g=self.radius_conf_5g,) + self.ent_network_2g_a = self.ent_networks[0]["2g"] + self.ent_network_2g_b = self.ent_networks[1]["2g"] + self.bssid_2g_a = self.ent_network_2g_a[WifiEnums.BSSID_KEY.lower()] + self.bssid_2g_b = self.ent_network_2g_b[WifiEnums.BSSID_KEY.lower()] + self.ent_roaming_ssid = self.ent_network_2g_a[WifiEnums.SSID_KEY] + self.bssid_a = self.bssid_2g_a + self.bssid_b = self.bssid_2g_b + self.config_peap = { Ent.EAP: int(EAP.PEAP), Ent.CA_CERT: self.ca_cert, @@ -80,7 +97,7 @@ class WifiEnterpriseRoamingTest(base_test.BaseTestClass): WifiEnums.SSID_KEY: self.ent_roaming_ssid, } self.attn_a = self.attenuators[0] - self.attn_b = self.attenuators[1] + self.attn_b = self.attenuators[2] # Set screen lock password so ConfigStore is unlocked. self.dut.droid.setDevicePassword(self.device_password) self.set_attns("default") diff --git a/acts/tests/google/wifi/WifiEnterpriseTest.py b/acts/tests/google/wifi/WifiEnterpriseTest.py index 41a2512361..fd70e24f2d 100755 --- a/acts/tests/google/wifi/WifiEnterpriseTest.py +++ b/acts/tests/google/wifi/WifiEnterpriseTest.py @@ -19,12 +19,12 @@ import random import time from acts import asserts -from acts import base_test from acts import signals from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest WifiEnums = wutils.WifiEnums @@ -35,9 +35,9 @@ EapPhase2 = WifiEnums.EapPhase2 Ent = WifiEnums.Enterprise -class WifiEnterpriseTest(base_test.BaseTestClass): +class WifiEnterpriseTest(WifiBaseTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + WifiBaseTest.__init__(self, controllers) def setup_class(self): self.dut = self.android_devices[0] @@ -52,12 +52,23 @@ class WifiEnterpriseTest(base_test.BaseTestClass): "passpoint_client_cert", "passpoint_client_key", "eap_identity", "eap_password", "invalid_ca_cert", "invalid_client_cert", "invalid_client_key", "fqdn", "provider_friendly_name", "realm", - "ssid_peap0", "ssid_peap1", "ssid_tls", "ssid_ttls", "ssid_pwd", - "ssid_sim", "ssid_aka", "ssid_aka_prime", "ssid_passpoint", - "device_password", "ping_addr") + "device_password", "ping_addr", "radius_conf_2g", "radius_conf_5g", + "radius_conf_pwd") self.unpack_userparams(required_userparam_names, roaming_consortium_ids=None, plmn=None) + + if "AccessPoint" in self.user_params: + self.legacy_configure_ap_and_start( + ent_network=True, + radius_conf_2g=self.radius_conf_2g, + radius_conf_5g=self.radius_conf_5g, + ent_network_pwd=True, + radius_conf_pwd=self.radius_conf_pwd,) + self.ent_network_2g = self.ent_networks[0]["2g"] + self.ent_network_5g = self.ent_networks[0]["5g"] + self.ent_network_pwd = self.ent_networks_pwd[0]["2g"] + # Default configs for EAP networks. self.config_peap0 = { Ent.EAP: int(EAP.PEAP), @@ -65,14 +76,15 @@ class WifiEnterpriseTest(base_test.BaseTestClass): Ent.IDENTITY: self.eap_identity, Ent.PASSWORD: self.eap_password, Ent.PHASE2: int(EapPhase2.MSCHAPV2), - WifiEnums.SSID_KEY: self.ssid_peap0 + WifiEnums.SSID_KEY: self.ent_network_5g[WifiEnums.SSID_KEY], } self.config_peap1 = dict(self.config_peap0) - self.config_peap1[WifiEnums.SSID_KEY] = self.ssid_peap1 + self.config_peap1[WifiEnums.SSID_KEY] = \ + self.ent_network_2g[WifiEnums.SSID_KEY] self.config_tls = { Ent.EAP: int(EAP.TLS), Ent.CA_CERT: self.ca_cert, - WifiEnums.SSID_KEY: self.ssid_tls, + WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY], Ent.CLIENT_CERT: self.client_cert, Ent.PRIVATE_KEY_ID: self.client_key, Ent.IDENTITY: self.eap_identity, @@ -83,25 +95,25 @@ class WifiEnterpriseTest(base_test.BaseTestClass): Ent.IDENTITY: self.eap_identity, Ent.PASSWORD: self.eap_password, Ent.PHASE2: int(EapPhase2.MSCHAPV2), - WifiEnums.SSID_KEY: self.ssid_ttls + WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY], } self.config_pwd = { Ent.EAP: int(EAP.PWD), Ent.IDENTITY: self.eap_identity, Ent.PASSWORD: self.eap_password, - WifiEnums.SSID_KEY: self.ssid_pwd + WifiEnums.SSID_KEY: self.ent_network_pwd[WifiEnums.SSID_KEY], } self.config_sim = { Ent.EAP: int(EAP.SIM), - WifiEnums.SSID_KEY: self.ssid_sim, + WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY], } self.config_aka = { Ent.EAP: int(EAP.AKA), - WifiEnums.SSID_KEY: self.ssid_aka, + WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY], } self.config_aka_prime = { Ent.EAP: int(EAP.AKA_PRIME), - WifiEnums.SSID_KEY: self.ssid_aka_prime, + WifiEnums.SSID_KEY: self.ent_network_2g[WifiEnums.SSID_KEY], } # Base config for passpoint networks. @@ -142,14 +154,10 @@ class WifiEnterpriseTest(base_test.BaseTestClass): self.dut.droid.wakeUpNow() wutils.reset_wifi(self.dut) self.dut.ed.clear_all_events() - self.tcpdump_pid = start_adb_tcpdump(self.dut, self.test_name, mask='all') + self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) def teardown_test(self): - if self.tcpdump_pid: - stop_adb_tcpdump(self.dut, - self.tcpdump_pid, - pull_tcpdump=True) - self.tcpdump_pid = None + stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() self.dut.droid.wifiStopTrackingStateChange() diff --git a/acts/tests/google/wifi/WifiManagerTest.py b/acts/tests/google/wifi/WifiManagerTest.py index 5a5dfebfef..30996486cf 100755 --- a/acts/tests/google/wifi/WifiManagerTest.py +++ b/acts/tests/google/wifi/WifiManagerTest.py @@ -40,7 +40,7 @@ class WifiManagerTest(WifiBaseTest): """Tests for APIs in Android's WifiManager class. Test Bed Requirement: - * One Android device + * Two Android device * Several Wi-Fi networks visible to the device, including an open Wi-Fi network. """ @@ -50,39 +50,47 @@ class WifiManagerTest(WifiBaseTest): def setup_class(self): self.dut = self.android_devices[0] + self.dut_client = self.android_devices[1] wutils.wifi_test_device_init(self.dut) + wutils.wifi_test_device_init(self.dut_client) req_params = [] opt_param = [ - "open_network", "reference_networks", "iperf_server_address" + "open_network", "reference_networks", "iperf_server_address", + "wpa_networks", "wep_networks" ] self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_param) if "AccessPoint" in self.user_params: - self.legacy_configure_ap_and_start() + self.legacy_configure_ap_and_start(wpa_network=True, wep_network=True) asserts.assert_true( len(self.reference_networks) > 0, "Need at least one reference network with psk.") wutils.wifi_toggle_state(self.dut, True) + wutils.wifi_toggle_state(self.dut_client, True) if "iperf_server_address" in self.user_params: self.iperf_server = self.iperf_servers[0] self.wpapsk_2g = self.reference_networks[0]["2g"] self.wpapsk_5g = self.reference_networks[0]["5g"] - self.open_network = self.open_network[0]["2g"] + self.open_network_2g = self.open_network[0]["2g"] + self.open_network_5g = self.open_network[0]["5g"] if hasattr(self, 'iperf_server'): self.iperf_server.start() def setup_test(self): - self.dut.droid.wakeLockAcquireBright() - self.dut.droid.wakeUpNow() + for ad in self.android_devices: + ad.droid.wakeLockAcquireBright() + ad.droid.wakeUpNow() wutils.wifi_toggle_state(self.dut, True) def teardown_test(self): - self.dut.droid.wakeLockRelease() - self.dut.droid.goToSleepNow() + for ad in self.android_devices: + ad.droid.wakeLockRelease() + ad.droid.goToSleepNow() self.turn_location_off_and_scan_toggle_off() wutils.reset_wifi(self.dut) + wutils.reset_wifi(self.dut_client) def teardown_class(self): if hasattr(self, 'iperf_server'): @@ -526,9 +534,12 @@ class WifiManagerTest(WifiBaseTest): @test_tracker_info(uuid="71556e06-7fb1-4e2b-9338-b01f1f8e286e") def test_scan(self): """Test wifi connection scan can start and find expected networks.""" - ssid = self.open_network[WifiEnums.SSID_KEY] + ssid = self.open_network_2g[WifiEnums.SSID_KEY] + wutils.start_wifi_connection_scan_and_ensure_network_found( + self.dut, ssid) + ssid = self.open_network_5g[WifiEnums.SSID_KEY] wutils.start_wifi_connection_scan_and_ensure_network_found( - self.dut, ssid); + self.dut, ssid) @test_tracker_info(uuid="3ea09efb-6921-429e-afb1-705ef5a09afa") def test_scan_with_wifi_off_and_location_scan_on(self): @@ -537,9 +548,12 @@ class WifiManagerTest(WifiBaseTest): wutils.wifi_toggle_state(self.dut, False) """Test wifi connection scan can start and find expected networks.""" - ssid = self.open_network[WifiEnums.SSID_KEY] + ssid = self.open_network_2g[WifiEnums.SSID_KEY] + wutils.start_wifi_connection_scan_and_ensure_network_found( + self.dut, ssid) + ssid = self.open_network_5g[WifiEnums.SSID_KEY] wutils.start_wifi_connection_scan_and_ensure_network_found( - self.dut, ssid); + self.dut, ssid) @test_tracker_info(uuid="770caebe-bcb1-43ac-95b6-5dd52dd90e80") def test_scan_with_wifi_off_and_location_scan_off(self): @@ -559,8 +573,8 @@ class WifiManagerTest(WifiBaseTest): @test_tracker_info(uuid="a4ad9930-a8fa-4868-81ed-a79c7483e502") def test_add_network(self): """Test wifi connection scan.""" - ssid = self.open_network[WifiEnums.SSID_KEY] - nId = self.dut.droid.wifiAddNetwork(self.open_network) + ssid = self.open_network_2g[WifiEnums.SSID_KEY] + nId = self.dut.droid.wifiAddNetwork(self.open_network_2g) asserts.assert_true(nId > -1, "Failed to add network.") configured_networks = self.dut.droid.wifiGetConfiguredNetworks() self.log.debug( @@ -571,8 +585,8 @@ class WifiManagerTest(WifiBaseTest): @test_tracker_info(uuid="aca85551-10ba-4007-90d9-08bcdeb16a60") def test_forget_network(self): - ssid = self.open_network[WifiEnums.SSID_KEY] - nId = self.dut.droid.wifiAddNetwork(self.open_network) + ssid = self.open_network_2g[WifiEnums.SSID_KEY] + nId = self.dut.droid.wifiAddNetwork(self.open_network_2g) asserts.assert_true(nId > -1, "Failed to add network.") configured_networks = self.dut.droid.wifiGetConfiguredNetworks() self.log.debug( @@ -837,5 +851,113 @@ class WifiManagerTest(WifiBaseTest): Connect to a wifi network, then the same as test_energy_info. """ - wutils.wifi_connect(self.dut, self.open_network) + wutils.wifi_connect(self.dut, self.open_network_2g) self.get_energy_info() + + @test_tracker_info(uuid="2622c253-defc-4a35-93a6-ca9d29a8238c") + def test_connect_to_wep_2g(self): + """Verify DUT can connect to 2GHz WEP network + + Steps: + 1. Ensure the 2GHz WEP network is visible in scan result. + 2. Connect to the network and validate internet connection. + """ + wutils.connect_to_wifi_network(self.dut, self.wep_networks[0]["2g"]) + + @test_tracker_info(uuid="1f2d17a2-e92d-43af-966b-3421c0db8620") + def test_connect_to_wep_5g(self): + """Verify DUT can connect to 5GHz WEP network + + Steps: + 1. Ensure the 5GHz WEP network is visible in scan result. + 2. Connect to the network and validate internet connection. + """ + wutils.connect_to_wifi_network(self.dut, self.wep_networks[0]["5g"]) + + @test_tracker_info(uuid="4a957952-289d-4657-9882-e1475274a7ff") + def test_connect_to_wpa_2g(self): + """Verify DUT can connect to 2GHz WPA-PSK network + + Steps: + 1. Ensure the 2GHz WPA-PSK network is visible in scan result. + 2. Connect to the network and validate internet connection. + """ + wutils.connect_to_wifi_network(self.dut, self.wpa_networks[0]["2g"]) + + @test_tracker_info(uuid="612c3c31-a4c5-4014-9a2d-3f4bcc20c0d7") + def test_connect_to_wpa_5g(self): + """Verify DUT can connect to 5GHz WPA-PSK network + + Steps: + 1. Ensure the 5GHz WPA-PSK network is visible in scan result. + 2. Connect to the network and validate internet connection. + """ + wutils.connect_to_wifi_network(self.dut, self.wpa_networks[0]["5g"]) + + @test_tracker_info(uuid="2a617fb4-1d8e-44e9-a500-a5456e1df83f") + def test_connect_to_2g_can_be_pinged(self): + """Verify DUT can be pinged by another device when it connects to 2GHz AP + + Steps: + 1. Ensure the 2GHz WPA-PSK network is visible in scan result. + 2. Connect to the network and validate internet connection. + 3. Check DUT can be pinged by another device + """ + wutils.connect_to_wifi_network(self.dut, self.wpa_networks[0]["2g"]) + wutils.connect_to_wifi_network(self.dut_client, self.wpa_networks[0]["2g"]) + dut_address = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] + asserts.assert_true( + acts.utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_address, timeout=20), + "%s ping %s failed" % (self.dut_client.serial, dut_address)) + + @test_tracker_info(uuid="94bdd657-649b-4a2c-89c3-3ec6ba18e08e") + def test_connect_to_5g_can_be_pinged(self): + """Verify DUT can be pinged by another device when it connects to 5GHz AP + + Steps: + 1. Ensure the 5GHz WPA-PSK network is visible in scan result. + 2. Connect to the network and validate internet connection. + 3. Check DUT can be pinged by another device + """ + wutils.connect_to_wifi_network(self.dut, self.wpa_networks[0]["5g"]) + wutils.connect_to_wifi_network(self.dut_client, self.wpa_networks[0]["5g"]) + dut_address = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] + asserts.assert_true( + acts.utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_address, timeout=20), + "%s ping %s failed" % (self.dut_client.serial, dut_address)) + + @test_tracker_info(uuid="d87359aa-c4da-4554-b5de-8e3fa852a6b0") + def test_sta_turn_off_screen_can_be_pinged(self): + """Verify DUT can be pinged by another device after idle for a while + + Steps: + 1. Ensure the 2GHz WPA-PSK network is visible in scan result. + 2. DUT and DUT_Client connect to the network and validate internet connection. + 3. Let DUT sleep for 5 minutes + 4. Check DUT can be pinged by DUT_Client + """ + # DUT connect to AP + wutils.connect_to_wifi_network(self.dut, self.wpa_networks[0]["2g"]) + wutils.connect_to_wifi_network(self.dut_client, self.wpa_networks[0]["2g"]) + # Check DUT and DUT_Client can ping each other successfully + dut_address = self.dut.droid.connectivityGetIPv4Addresses('wlan0')[0] + dut_client_address = self.dut_client.droid.connectivityGetIPv4Addresses('wlan0')[0] + asserts.assert_true( + acts.utils.adb_shell_ping(self.dut, count=10, dest_ip=dut_client_address, timeout=20), + "ping DUT %s failed" % dut_client_address) + asserts.assert_true( + acts.utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_address, timeout=20), + "ping DUT %s failed" % dut_address) + # DUT turn off screen and go sleep for 5 mins + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + # TODO(hsiuchangchen): find a way to check system already suspended + # instead of waiting 5 mins + self.log.info("Sleep for 5 minutes") + time.sleep(300) + # Verify DUT_Client can ping DUT when DUT sleeps + asserts.assert_true( + acts.utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_address, timeout=20), + "ping DUT %s failed" % dut_address) + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() diff --git a/acts/tests/google/wifi/WifiNetworkSelectorTest.py b/acts/tests/google/wifi/WifiNetworkSelectorTest.py index 948f961236..ffeb6b5027 100644 --- a/acts/tests/google/wifi/WifiNetworkSelectorTest.py +++ b/acts/tests/google/wifi/WifiNetworkSelectorTest.py @@ -56,9 +56,12 @@ class WifiNetworkSelectorTest(WifiBaseTest): self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_param) - if "AccessPoint" in self.user_params: + if hasattr(self, 'access_points'): self.legacy_configure_ap_and_start(ap_count=2) + if hasattr(self, 'packet_capture'): + self.configure_packet_capture() + def setup_test(self): #reset and clear all saved networks on the DUT wutils.reset_wifi(self.dut) @@ -70,12 +73,22 @@ class WifiNetworkSelectorTest(WifiBaseTest): self.dut.droid.wakeUpNow() self.dut.ed.clear_all_events() + if hasattr(self, 'packet_capture'): + self.pcap_pids = wutils.start_pcap( + self.packet_capture, 'dual', self.log_path, self.test_name) + def teardown_test(self): #turn off the screen self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() + def on_pass(self, test_name, begin_time): + if hasattr(self, 'packet_capture'): + wutils.stop_pcap(self.packet_capture, self.pcap_pids, True) + def on_fail(self, test_name, begin_time): + if hasattr(self, 'packet_capture'): + wutils.stop_pcap(self.packet_capture, self.pcap_pids, False) self.dut.take_bug_report(test_name, begin_time) self.dut.cat_adb_log(test_name, begin_time) diff --git a/acts/tests/google/wifi/WifiP2pManagerTest.py b/acts/tests/google/wifi/WifiP2pManagerTest.py new file mode 100644 index 0000000000..b3eb686a82 --- /dev/null +++ b/acts/tests/google/wifi/WifiP2pManagerTest.py @@ -0,0 +1,139 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts import utils +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest + +DEFAULT_TIMEOUT = 30 +DEFAULT_SLEEPTIME = 5 + +class WifiP2pManagerTest(WifiBaseTest): + """Tests for APIs in Android's WifiP2pManager class. + + Test Bed Requirement: + * Two Android devices + """ + + def __init__(self, controllers): + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + self.dut_client = self.android_devices[1] + + wutils.wifi_test_device_init(self.dut) + self.dut.droid.wifiP2pInitialize() + asserts.assert_true(self.dut.droid.wifiP2pIsEnabled(), + "DUT's p2p should be initialized but it didn't") + self.dut_name = "Android_" + utils.rand_ascii_str(4) + self.dut.droid.wifiP2pSetDeviceName(self.dut_name) + wutils.wifi_test_device_init(self.dut_client) + self.dut_client.droid.wifiP2pInitialize() + asserts.assert_true(self.dut_client.droid.wifiP2pIsEnabled(), + "Peer's p2p should be initialized but it didn't") + self.dut_client_name = "Android_" + utils.rand_ascii_str(4) + self.dut_client.droid.wifiP2pSetDeviceName(self.dut_client_name) + + def teardown_class(self): + self.dut.droid.wifiP2pClose() + self.dut_client.droid.wifiP2pClose() + + def setup_test(self): + for ad in self.android_devices: + ad.droid.wakeLockAcquireBright() + ad.droid.wakeUpNow() + + def teardown_test(self): + # Clear p2p group info + for ad in self.android_devices: + ad.droid.wifiP2pRequestPersistentGroupInfo() + event = ad.ed.pop_event("WifiP2pOnPersistentGroupInfoAvailable", DEFAULT_TIMEOUT) + for network in event['data']: + ad.droid.wifiP2pDeletePersistentGroup(network['NetworkId']) + ad.droid.wakeLockRelease() + ad.droid.goToSleepNow() + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + ad.cat_adb_log(test_name, begin_time) + + """Helper Functions""" + + def _is_discovered(self, event, device_name): + for device in event['data']['Peers']: + if device['Name'] == device_name: + return True + return False + + """Test Cases""" + @test_tracker_info(uuid="28ddb16c-2ce4-44da-92f9-701d0dacc321") + def test_p2p_discovery(self): + """Verify the p2p discovery functionality + + Steps: + 1. Discover the target device + """ + # Discover the target device + self.log.info("Device discovery") + self.dut.droid.wifiP2pDiscoverPeers() + self.dut_client.droid.wifiP2pDiscoverPeers() + dut_event = self.dut.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT) + peer_event = self.dut_client.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT) + asserts.assert_true(self._is_discovered(dut_event, self.dut_client_name), + "DUT didn't discovered peer device") + asserts.assert_true(self._is_discovered(peer_event, self.dut_name), + "Peer didn't discovered DUT device") + + @test_tracker_info(uuid="708af645-6562-41da-9cd3-bdca428ac308") + def test_p2p_connect(self): + """Verify the p2p connect functionality + + Steps: + 1. Discover the target device + 2. Request the connection + 3. Disconnect + """ + # Discover the target device + self.log.info("Device discovery") + self.dut.droid.wifiP2pDiscoverPeers() + self.dut_client.droid.wifiP2pDiscoverPeers() + dut_event = self.dut.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT) + peer_event = self.dut_client.ed.pop_event("WifiP2pOnPeersAvailable", DEFAULT_TIMEOUT) + asserts.assert_true(self._is_discovered(dut_event, self.dut_client_name), + "DUT didn't discovered peer device") + asserts.assert_true(self._is_discovered(peer_event, self.dut_name), + "Peer didn't discovered DUT device") + + # Request the connection + self.log.info("Create p2p connection") + self.dut.droid.wifiP2pConnect(self.dut_client_name) + time.sleep(DEFAULT_SLEEPTIME) + self.dut_client.droid.wifiP2pAcceptConnection() + self.dut.ed.pop_event("WifiP2pConnectOnSuccess", DEFAULT_TIMEOUT) + + # Disconnect + self.log.info("Disconnect") + self.dut.droid.wifiP2pRemoveGroup() + self.dut.droid.wifiP2pRequestConnectionInfo() + event = self.dut.ed.pop_event("WifiP2pOnConnectionInfoAvailable", DEFAULT_TIMEOUT) + asserts.assert_false(event['data']['groupFormed'], + "P2P connection should be disconnected but it didn't") diff --git a/acts/tests/google/wifi/WifiPingTest.py b/acts/tests/google/wifi/WifiPingTest.py new file mode 100644 index 0000000000..3153c2fed1 --- /dev/null +++ b/acts/tests/google/wifi/WifiPingTest.py @@ -0,0 +1,515 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import logging +import os +import statistics +import time +from acts import asserts +from acts import base_test +from acts import utils +from acts.metrics.loggers.blackbox import BlackboxMetricLogger +from acts.test_utils.wifi import wifi_power_test_utils as wputils +from acts.test_utils.wifi import wifi_retail_ap as retail_ap +from acts.test_utils.wifi import wifi_test_utils as wutils + + +class WifiPingTest(base_test.BaseTestClass): + """Class for ping-based Wifi performance tests. + + This class implements WiFi ping performance tests such as range and RTT. + The class setups up the AP in the desired configurations, configures + and connects the phone to the AP, and runs For an example config file to + run this test class see example_connectivity_performance_ap_sta.json. + """ + + TEST_TIMEOUT = 10 + SHORT_SLEEP = 1 + MED_SLEEP = 5 + + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.ping_range_metric = BlackboxMetricLogger.for_test_case( + metric_name='ping_range') + self.ping_rtt_metric = BlackboxMetricLogger.for_test_case( + metric_name='ping_rtt') + self.tests = ( + "test_ping_range_ch1_VHT20", "test_fast_ping_rtt_ch1_VHT20", + "test_slow_ping_rtt_ch1_VHT20", "test_ping_range_ch6_VHT20", + "test_fast_ping_rtt_ch6_VHT20", "test_slow_ping_rtt_ch6_VHT20", + "test_ping_range_ch11_VHT20", "test_fast_ping_rtt_ch11_VHT20", + "test_slow_ping_rtt_ch11_VHT20", "test_ping_range_ch36_VHT20", + "test_fast_ping_rtt_ch36_VHT20", "test_slow_ping_rtt_ch36_VHT20", + "test_ping_range_ch36_VHT40", "test_fast_ping_rtt_ch36_VHT40", + "test_slow_ping_rtt_ch36_VHT40", "test_ping_range_ch36_VHT80", + "test_fast_ping_rtt_ch36_VHT80", "test_slow_ping_rtt_ch36_VHT80", + "test_ping_range_ch40_VHT20", "test_ping_range_ch44_VHT20", + "test_ping_range_ch44_VHT40", "test_ping_range_ch48_VHT20", + "test_ping_range_ch149_VHT20", "test_fast_ping_rtt_ch149_VHT20", + "test_slow_ping_rtt_ch149_VHT20", "test_ping_range_ch149_VHT40", + "test_fast_ping_rtt_ch149_VHT40", "test_slow_ping_rtt_ch149_VHT40", + "test_ping_range_ch149_VHT80", "test_fast_ping_rtt_ch149_VHT80", + "test_slow_ping_rtt_ch149_VHT80", "test_ping_range_ch153_VHT20", + "test_ping_range_ch157_VHT20", "test_ping_range_ch157_VHT40", + "test_ping_range_ch161_VHT20") + + def setup_class(self): + self.client_dut = self.android_devices[-1] + req_params = [ + "ping_test_params", "testbed_params", "main_network", + "RetailAccessPoints" + ] + opt_params = ["golden_files_list"] + self.unpack_userparams(req_params, opt_params) + self.test_params = self.ping_test_params + self.num_atten = self.attenuators[0].instrument.num_atten + # iperf server doubles as ping server to reduce config parameters + self.iperf_server = self.iperf_servers[0] + self.access_points = retail_ap.create(self.RetailAccessPoints) + self.access_point = self.access_points[0] + self.log.info("Access Point Configuration: {}".format( + self.access_point.ap_settings)) + self.log_path = os.path.join(logging.log_path, "results") + utils.create_dir(self.log_path) + if not hasattr(self, "golden_files_list"): + self.golden_files_list = [ + os.path.join(self.testbed_params["golden_results_path"], + file) for file in os.listdir( + self.testbed_params["golden_results_path"]) + ] + self.testclass_results = [] + + # Turn WiFi ON + for dev in self.android_devices: + wutils.wifi_toggle_state(dev, True) + + def pass_fail_check_ping_rtt(self, ping_range_result): + """Check the test result and decide if it passed or failed. + + The function computes RTT statistics and fails any tests in which the + tail of the ping latency results exceeds the threshold defined in the + configuration file. + + Args: + ping_range_result: dict containing ping results and other meta data + """ + ignored_fraction = self.test_params[ + "rtt_ignored_interval"] / self.test_params["rtt_ping_duration"] + sorted_rtt = [ + sorted(x["rtt"][round(ignored_fraction * len(x["rtt"])):]) + for x in ping_range_result["ping_results"] + ] + mean_rtt = [statistics.mean(x) for x in sorted_rtt] + std_rtt = [statistics.stdev(x) for x in sorted_rtt] + rtt_at_test_percentile = [ + x[int( + len(x) * + ((100 - self.test_params["rtt_test_percentile"]) / 100))] + for x in sorted_rtt + ] + # Set blackbox metric + self.ping_rtt_metric.metric_value = max(rtt_at_test_percentile) + # Evaluate test pass/fail + test_failed = False + for idx, rtt in enumerate(rtt_at_test_percentile): + if rtt > self.test_params["rtt_threshold"] * 1000: + test_failed = True + self.log.info( + "RTT Failed. Test %ile RTT = {}ms. Mean = {}ms. Stdev = {}". + format(rtt, mean_rtt[idx], std_rtt[idx])) + if test_failed: + asserts.fail("RTT above threshold") + else: + asserts.explicit_pass( + "Test Passed. RTTs at test percentile = {}".format( + rtt_at_test_percentile)) + + def pass_fail_check_ping_range(self, ping_range_result): + """Check the test result and decide if it passed or failed. + + Checks whether the attenuation at which ping packet losses begin to + exceed the threshold matches the range derived from golden + rate-vs-range result files. The test fails is ping range is + range_gap_threshold worse than RvR range. + + Args: + ping_range_result: dict containing ping results and meta data + """ + try: + rvr_range = self.get_range_from_rvr() + except: + rvr_range = float("nan") + + ping_loss_over_att = [ + x["packet_loss_percentage"] + for x in ping_range_result["ping_results"] + ] + ping_loss_above_threshold = [ + int(x < self.test_params["range_ping_loss_threshold"]) + for x in ping_loss_over_att + ] + attenuation_at_range = self.atten_range[ping_loss_above_threshold.index( + 0) - 1] + ping_range_result["fixed_attenuation"] + # Set Blackbox metric + self.ping_range_metric.metric_value = attenuation_at_range + # Evaluate test pass/fail + if attenuation_at_range - rvr_range < -self.test_params["range_gap_threshold"]: + asserts.fail( + "Attenuation at range is {}dB. Golden range is {}dB".format( + attenuation_at_range, rvr_range)) + else: + asserts.explicit_pass( + "Attenuation at range is {}dB. Golden range is {}dB".format( + attenuation_at_range, rvr_range)) + + def post_process_ping_results(self, ping_range_result): + """Saves and plots ping results. + + Args: + ping_range_result: dict containing ping results and metadata + """ + results_file_path = "{}/{}.json".format(self.log_path, + self.current_test_name) + with open(results_file_path, 'w') as results_file: + json.dump(ping_range_result, results_file, indent=4) + + x_data = [ + list(range(len(x["rtt"]))) + for x in ping_range_result["ping_results"] if len(x["rtt"]) > 1 + ] + rtt_data = [ + x["rtt"] for x in ping_range_result["ping_results"] + if len(x["rtt"]) > 1 + ] + #legend = ["Round Trip Time" for x in ping_range_result["ping_results"]] + legend = [ + "RTT @ {}dB".format(att) + for att in ping_range_result["attenuation"] + ] + + data_sets = [x_data, rtt_data] + fig_property = { + "title": self.current_test_name, + "x_label": 'Sample Index', + "y_label": 'Round Trip Time (ms)', + "linewidth": 3, + "markersize": 0 + } + output_file_path = "{}/{}.html".format(self.log_path, + self.current_test_name) + wputils.bokeh_plot( + data_sets, + legend, + fig_property, + shaded_region=None, + output_file_path=output_file_path) + + def get_range_from_rvr(self): + """Function gets range from RvR golden results + + The function fetches the attenuation at which the RvR throughput goes + to zero. + + Returns: + range: range derived from looking at rvr curves + """ + # Fetch the golden RvR results + test_name = self.current_test_name + rvr_golden_file_name = "test_rvr_TCP_DL_" + "_".join( + test_name.split("_")[3:]) + golden_path = [ + file_name for file_name in self.golden_files_list + if rvr_golden_file_name in file_name + ] + with open(golden_path[0], 'r') as golden_file: + golden_results = json.load(golden_file) + # Get 0 Mbps attenuation and backoff by low_rssi_backoff_from_range + atten_idx = golden_results["throughput_receive"].index(0) + rvr_range = golden_results["attenuation"][atten_idx - + 1] + golden_results["fixed_attenuation"] + return rvr_range + + def get_ping_stats(self, ping_from_dut, ping_duration, ping_interval, + ping_size): + """Run ping to or from the DUT. + + The function computes either pings the DUT or pings a remote ip from + DUT. + + Args: + ping_from_dut: boolean set to true if pinging from the DUT + ping_duration: timeout to set on the the ping process (in seconds) + ping_interval: time between pings (in seconds) + ping_size: size of ping packet payload + Returns: + ping_result: dict containing ping results and other meta data + """ + ping_cmd = "ping -w {} -i {} -s {}".format( + ping_duration, + ping_interval, + ping_size, + ) + if ping_from_dut: + ping_cmd = "{} {}".format( + ping_cmd, self.testbed_params["outgoing_ping_address"]) + ping_output = self.client_dut.adb.shell( + ping_cmd, + timeout=ping_duration + self.TEST_TIMEOUT, + ignore_status=True) + else: + ping_cmd = "sudo {} {}".format(ping_cmd, self.dut_ip) + ping_output = self.iperf_server.ssh_session.run( + ping_cmd, ignore_status=True).stdout + ping_output = ping_output.splitlines() + + if len(ping_output) == 1: + ping_result = {"connected": 0} + else: + packet_loss_line = [line for line in ping_output if "loss" in line] + packet_loss_percentage = int( + packet_loss_line[0].split("%")[0].split(" ")[-1]) + if packet_loss_percentage == 100: + rtt = [float("nan")] + else: + rtt = [ + line.split("time=")[1] for line in ping_output + if "time=" in line + ] + rtt = [float(line.split(" ")[0]) for line in rtt] + ping_result = { + "connected": 1, + "rtt": rtt, + "packet_loss_percentage": packet_loss_percentage + } + return ping_result + + def ping_test(self, channel, mode, atten_levels, ping_duration, + ping_interval, ping_size): + """Main function to test ping. + + The function sets up the AP in the correct channel and mode + configuration and calls get_ping_stats while sweeping attenuation + + Args: + channel: Specifies AP's channel + mode: Specifies AP's bandwidth/mode (11g, VHT20, VHT40, VHT80) + atten_levels: array of attenuation levels to run ping test at + ping_duration: timeout to set on the the ping process (in seconds) + ping_interval: time between pings (in seconds) + ping_size: size of ping packet payload + Returns: + test_result: dict containing ping results and other meta data + """ + band = self.access_point.band_lookup_by_channel(channel) + if "2G" in band: + frequency = wutils.WifiEnums.channel_2G_to_freq[channel] + else: + frequency = wutils.WifiEnums.channel_5G_to_freq[channel] + if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: + self.access_point.set_region(self.testbed_params["DFS_region"]) + else: + self.access_point.set_region(self.testbed_params["default_region"]) + self.access_point.set_channel(band, channel) + self.access_point.set_bandwidth(band, mode) + self.log.info("Access Point Configuration: {}".format( + self.access_point.ap_settings)) + + # Set attenuator to 0 dB + for atten in atten_levels: + for attenuator in self.attenuators: + attenuator.set_atten(0) + # Resest, configure, and connect DUT + wutils.reset_wifi(self.client_dut) + self.client_dut.droid.wifiSetCountryCode( + self.test_params["country_code"]) + self.main_network[band]["channel"] = channel + wutils.wifi_connect( + self.client_dut, self.main_network[band], num_of_tries=5) + self.dut_ip = self.client_dut.droid.connectivityGetIPv4Addresses( + 'wlan0')[0] + time.sleep(self.MED_SLEEP) + + test_result = {"ping_results": []} + test_result["test_name"] = self.current_test_name + test_result["ap_config"] = self.access_point.ap_settings.copy() + test_result["attenuation"] = atten_levels + test_result["fixed_attenuation"] = self.testbed_params[ + "fixed_attenuation"][str(channel)] + for atten in atten_levels: + for attenuator in self.attenuators: + attenuator.set_atten(atten) + time.sleep(self.SHORT_SLEEP) + current_ping_stats = self.get_ping_stats(0, ping_duration, + ping_interval, ping_size) + if current_ping_stats["connected"]: + self.log.info( + "Attenuation = {0}dB Packet Loss Rate = {1}%. Avg Ping RTT = {2:.2f}ms". + format(atten, current_ping_stats["packet_loss_percentage"], + statistics.mean(current_ping_stats["rtt"]))) + else: + self.log.info( + "Attenuation = {}dB. Disconnected.".format(atten)) + test_result["ping_results"].append(current_ping_stats) + return test_result + + def _test_ping_rtt(self): + """ Function that gets called for each RTT test case + + The function gets called in each RTT test case. The function customizes + the RTT test based on the test name of the test that called it + """ + test_params = self.current_test_name.split("_") + self.channel = int(test_params[4][2:]) + self.mode = test_params[5] + self.atten_range = self.test_params["rtt_test_attenuation"] + ping_range_result = self.ping_test( + self.channel, self.mode, self.atten_range, + self.test_params["rtt_ping_duration"], + self.test_params["rtt_ping_interval"][test_params[1]], + self.test_params["ping_size"]) + self.post_process_ping_results(ping_range_result) + self.pass_fail_check_ping_rtt(ping_range_result) + + def _test_ping_range(self): + """ Function that gets called for each range test case + + The function gets called in each range test case. It customizes the + range test based on the test name of the test that called it + """ + test_params = self.current_test_name.split("_") + self.channel = int(test_params[3][2:]) + self.mode = test_params[4] + num_atten_steps = int((self.test_params["range_atten_stop"] - + self.test_params["range_atten_start"]) / + self.test_params["range_atten_step"]) + self.atten_range = [ + self.test_params["range_atten_start"] + + x * self.test_params["range_atten_step"] + for x in range(0, num_atten_steps) + ] + ping_range_result = self.ping_test( + self.channel, self.mode, self.atten_range, + self.test_params["range_ping_duration"], + self.test_params["range_ping_interval"], + self.test_params["ping_size"]) + self.post_process_ping_results(ping_range_result) + self.pass_fail_check_ping_range(ping_range_result) + + def test_ping_range_ch1_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch6_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch11_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch36_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch36_VHT40(self): + self._test_ping_range() + + def test_ping_range_ch36_VHT80(self): + self._test_ping_range() + + def test_ping_range_ch40_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch44_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch44_VHT40(self): + self._test_ping_range() + + def test_ping_range_ch48_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch149_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch149_VHT40(self): + self._test_ping_range() + + def test_ping_range_ch149_VHT80(self): + self._test_ping_range() + + def test_ping_range_ch153_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch157_VHT20(self): + self._test_ping_range() + + def test_ping_range_ch157_VHT40(self): + self._test_ping_range() + + def test_ping_range_ch161_VHT20(self): + self._test_ping_range() + + def test_fast_ping_rtt_ch1_VHT20(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch1_VHT20(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch6_VHT20(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch6_VHT20(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch11_VHT20(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch11_VHT20(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch36_VHT20(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch36_VHT20(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch36_VHT40(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch36_VHT40(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch36_VHT80(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch36_VHT80(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch149_VHT20(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch149_VHT20(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch149_VHT40(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch149_VHT40(self): + self._test_ping_rtt() + + def test_fast_ping_rtt_ch149_VHT80(self): + self._test_ping_rtt() + + def test_slow_ping_rtt_ch149_VHT80(self): + self._test_ping_rtt() diff --git a/acts/tests/google/wifi/WifiRssiTest.py b/acts/tests/google/wifi/WifiRssiTest.py index 4247344216..8dd0b52b57 100644 --- a/acts/tests/google/wifi/WifiRssiTest.py +++ b/acts/tests/google/wifi/WifiRssiTest.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import json import logging import math @@ -24,6 +25,7 @@ import time from acts import asserts from acts import base_test from acts import utils +from acts.metrics.loggers.blackbox import BlackboxMetricLogger from acts.test_decorators import test_tracker_info from acts.test_utils.wifi import wifi_power_test_utils as wputils from acts.test_utils.wifi import wifi_retail_ap as retail_ap @@ -40,14 +42,39 @@ RSSI_ERROR_VAL = float("nan") class WifiRssiTest(base_test.BaseTestClass): + """Class to test WiFi RSSI reporting. + + This class tests RSSI reporting on android devices. The class tests RSSI + accuracy by checking RSSI over a large attenuation range, checks for RSSI + stability over time when attenuation is fixed, and checks that RSSI quickly + and reacts to changes attenuation by checking RSSI trajectories over + configurable attenuation waveforms.For an example config file to run this + test class see example_connectivity_performance_ap_sta.json. + """ + def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) + test_metrics = [ + "signal_poll_rssi_shift", "signal_poll_avg_rssi_shift", + "scan_rssi_shift", "chain_0_rssi_shift", "chain_1_rssi_shift", + "signal_poll_rssi_error", "signal_poll_avg_rssi_error", + "scan_rssi_error", "chain_0_rssi_error", "chain_1_rssi_error", + "signal_poll_rssi_stdev", "chain_0_rssi_stdev", + "chain_1_rssi_stdev" + ] + for metric in test_metrics: + setattr( + self, + "{}_metric".format(metric), + BlackboxMetricLogger.for_test_case(metric_name=metric)) def setup_class(self): self.dut = self.android_devices[0] - req_params = ["rssi_test_params", "testbed_params", "main_network"] - opt_params = ["RetailAccessPoints"] - self.unpack_userparams(req_params, opt_params) + req_params = [ + "RetailAccessPoints", "rssi_test_params", "testbed_params", + "main_network" + ] + self.unpack_userparams(req_params) self.test_params = self.rssi_test_params self.num_atten = self.attenuators[0].instrument.num_atten self.iperf_server = self.iperf_servers[0] @@ -72,6 +99,14 @@ class WifiRssiTest(base_test.BaseTestClass): Args: postprocessed_results: compiled arrays of RSSI measurements """ + # Set Blackbox metric values + self.signal_poll_rssi_stdev_metric.metric_value = max( + postprocessed_results["signal_poll_rssi"]["stdev"]) + self.chain_0_rssi_stdev_metric.metric_value = max( + postprocessed_results["chain_0_rssi"]["stdev"]) + self.chain_1_rssi_stdev_metric.metric_value = max( + postprocessed_results["chain_1_rssi"]["stdev"]) + # Evaluate test pass/fail test_failed = any([ stdev > self.test_params["stdev_tolerance"] for stdev in postprocessed_results["signal_poll_rssi"]["stdev"] @@ -134,25 +169,32 @@ class WifiRssiTest(base_test.BaseTestClass): else: avg_error = RSSI_ERROR_VAL avg_shift = RSSI_ERROR_VAL + # Set Blackbox metric values + setattr( + getattr(self, "{}_error_metric".format(key)), + "metric_value", avg_error) + setattr( + getattr(self, "{}_shift_metric".format(key)), + "metric_value", avg_shift) + # Evaluate test pass/fail rssi_failure = (avg_error > self.test_params["abs_tolerance"] ) or math.isnan(avg_error) if rssi_failure and key in rssi_under_test: test_message = test_message + ( - "{} failed. Average {} error is {:.2f} dB. " - "Average shift is {:.2f} dB.\n").format( - key, error_type, avg_error, avg_shift) + "{} failed ({} error = {:.2f} dB, " + "shift = {:.2f} dB)\n").format(key, error_type, + avg_error, avg_shift) test_failed = True elif rssi_failure: test_message = test_message + ( - "{} failed (ignored). Average {} error is {:.2f} dB. " - "Average shift is {:.2f} dB.\n").format( - key, error_type, avg_error, avg_shift) + "{} failed (ignored) ({} error = {:.2f} dB, " + "shift = {:.2f} dB)\n").format(key, error_type, + avg_error, avg_shift) else: test_message = test_message + ( - "{} passed. Average {} error is {:.2f} dB. " - "Average shift is {:.2f} dB.\n").format( - key, error_type, avg_error, avg_shift) - + "{} passed ({} error = {:.2f} dB, " + "shift = {:.2f} dB)\n").format(key, error_type, + avg_error, avg_shift) if test_failed: asserts.fail(test_message) asserts.explicit_pass(test_message) @@ -173,15 +215,16 @@ class WifiRssiTest(base_test.BaseTestClass): with open(results_file_path, 'w') as results_file: json.dump(rssi_result, results_file, indent=4) # Compile results into arrays of RSSIs suitable for plotting - postprocessed_results = { - "signal_poll_rssi": {}, - "signal_poll_avg_rssi": {}, - "scan_rssi": {}, - "chain_0_rssi": {}, - "chain_1_rssi": {}, - "total_attenuation": [], - "predicted_rssi": [] - } + # yapf: disable + postprocessed_results = collections.OrderedDict( + [("signal_poll_rssi", {}), + ("signal_poll_avg_rssi", {}), + ("scan_rssi", {}), + ("chain_0_rssi", {}), + ("chain_1_rssi", {}), + ("total_attenuation", []), + ("predicted_rssi", [])]) + # yapf: enable for key, val in postprocessed_results.items(): if "scan_rssi" in key: postprocessed_results[key]["data"] = [ @@ -289,14 +332,15 @@ class WifiRssiTest(base_test.BaseTestClass): x_data = [] y_data = [] legends = [] - rssi_time_series = { - "signal_poll_rssi": [], - "signal_poll_avg_rssi": [], - "scan_rssi": [], - "chain_0_rssi": [], - "chain_1_rssi": [], - "predicted_rssi": [] - } + # yapf: disable + rssi_time_series = collections.OrderedDict( + [("signal_poll_rssi", []), + ("signal_poll_avg_rssi", []), + ("scan_rssi", []), + ("chain_0_rssi", []), + ("chain_1_rssi", []), + ("predicted_rssi", [])]) + # yapf: enable for key, val in rssi_time_series.items(): if "predicted_rssi" in key: rssi_time_series[key] = [ @@ -345,6 +389,11 @@ class WifiRssiTest(base_test.BaseTestClass): shaded_region=None, output_file_path=output_file_path) + @staticmethod + def empty_rssi_result(): + return collections.OrderedDict([("data", []), ("mean", None), ("stdev", + None)]) + def get_scan_rssi(self, tracked_bssids, num_measurements=1): """Gets scan RSSI for specified BSSIDs. @@ -355,9 +404,9 @@ class WifiRssiTest(base_test.BaseTestClass): scan_rssi: dict containing the measurement results as well as the statistics of the scan RSSI for all BSSIDs in tracked_bssids """ - scan_rssi = {} + scan_rssi = collections.OrderedDict() for bssid in tracked_bssids: - scan_rssi[bssid] = {"data": [], "mean": None, "stdev": None} + scan_rssi[bssid] = self.empty_rssi_result() for idx in range(num_measurements): scan_output = self.dut.adb.shell(SCAN) time.sleep(MED_SLEEP) @@ -401,28 +450,13 @@ class WifiRssiTest(base_test.BaseTestClass): all reported RSSI values (signal_poll, per chain, etc.) and their statistics """ - connected_rssi = { - "signal_poll_rssi": { - "data": [], - "mean": None, - "stdev": None - }, - "signal_poll_avg_rssi": { - "data": [], - "mean": None, - "stdev": None - }, - "chain_0_rssi": { - "data": [], - "mean": None, - "stdev": None - }, - "chain_1_rssi": { - "data": [], - "mean": None, - "stdev": None - } - } + # yapf: disable + connected_rssi = collections.OrderedDict( + [("signal_poll_rssi", self.empty_rssi_result()), + ("signal_poll_avg_rssi", self.empty_rssi_result()), + ("chain_0_rssi", self.empty_rssi_result()), + ("chain_1_rssi", self.empty_rssi_result())]) + # yapf: enable for idx in range(num_measurements): measurement_start_time = time.time() # Get signal poll RSSI @@ -515,12 +549,10 @@ class WifiRssiTest(base_test.BaseTestClass): for atten in self.rssi_atten_range: # Set Attenuation self.log.info("Setting attenuation to {} dB".format(atten)) - [ - self.attenuators[i].set_atten(atten) - for i in range(self.num_atten) - ] + for attenuator in self.attenuators: + attenuator.set_atten(atten) time.sleep(first_measurement_delay) - current_rssi = {} + current_rssi = collections.OrderedDict() current_rssi = self.get_connected_rssi(connected_measurements, polling_frequency) current_rssi["scan_rssi"] = self.get_scan_rssi( @@ -532,7 +564,8 @@ class WifiRssiTest(base_test.BaseTestClass): if self.iperf_traffic: self.iperf_server.stop() self.dut.adb.shell("pkill iperf3") - [self.attenuators[i].set_atten(0) for i in range(self.num_atten)] + for attenuator in self.attenuators: + attenuator.set_atten(0) return rssi_result def rssi_test_func(self, iperf_traffic, connected_measurements, @@ -548,7 +581,7 @@ class WifiRssiTest(base_test.BaseTestClass): rssi_result: dict containing rssi_results and meta data """ #Initialize test settings - rssi_result = {} + rssi_result = collections.OrderedDict() # Configure AP band = self.access_point.band_lookup_by_channel(self.channel) if "2G" in band: @@ -564,14 +597,13 @@ class WifiRssiTest(base_test.BaseTestClass): self.log.info("Access Point Configuration: {}".format( self.access_point.ap_settings)) # Set attenuator to starting attenuation - [ - self.attenuators[i].set_atten(self.rssi_atten_range[0]) - for i in range(self.num_atten) - ] + for attenuator in self.attenuators: + attenuator.set_atten(self.rssi_atten_range[0]) # Connect DUT to Network wutils.wifi_toggle_state(self.dut, True) wutils.reset_wifi(self.dut) self.main_network[band]["channel"] = self.channel + self.dut.droid.wifiSetCountryCode(self.test_params["country_code"]) wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5) time.sleep(MED_SLEEP) # Run RvR and log result @@ -906,25 +938,13 @@ class WifiRssiTest(base_test.BaseTestClass): class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ("test_rssi_stability_ch1_VHT20_ActiveTraffic", "test_rssi_vs_atten_ch1_VHT20_ActiveTraffic", "test_rssi_stability_ch2_VHT20_ActiveTraffic", "test_rssi_vs_atten_ch2_VHT20_ActiveTraffic", - "test_rssi_stability_ch3_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch3_VHT20_ActiveTraffic", - "test_rssi_stability_ch4_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch4_VHT20_ActiveTraffic", - "test_rssi_stability_ch5_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch5_VHT20_ActiveTraffic", "test_rssi_stability_ch6_VHT20_ActiveTraffic", "test_rssi_vs_atten_ch6_VHT20_ActiveTraffic", - "test_rssi_stability_ch7_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch7_VHT20_ActiveTraffic", - "test_rssi_stability_ch8_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch8_VHT20_ActiveTraffic", - "test_rssi_stability_ch9_VHT20_ActiveTraffic", - "test_rssi_vs_atten_ch9_VHT20_ActiveTraffic", "test_rssi_stability_ch10_VHT20_ActiveTraffic", "test_rssi_vs_atten_ch10_VHT20_ActiveTraffic", "test_rssi_stability_ch11_VHT20_ActiveTraffic", @@ -933,7 +953,7 @@ class WifiRssi_2GHz_ActiveTraffic_Test(WifiRssiTest): class WifiRssi_5GHz_ActiveTraffic_Test(WifiRssiTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ("test_rssi_stability_ch36_VHT20_ActiveTraffic", "test_rssi_vs_atten_ch36_VHT20_ActiveTraffic", "test_rssi_stability_ch36_VHT40_ActiveTraffic", diff --git a/acts/tests/google/wifi/WifiRvrTest.py b/acts/tests/google/wifi/WifiRvrTest.py index 9c4552565e..87ccf5cb0c 100644 --- a/acts/tests/google/wifi/WifiRvrTest.py +++ b/acts/tests/google/wifi/WifiRvrTest.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import collections import json import logging import math @@ -23,6 +24,7 @@ from acts import asserts from acts import base_test from acts import utils from acts.controllers import iperf_server as ipf +from acts.metrics.loggers.blackbox import BlackboxMetricLogger from acts.test_decorators import test_tracker_info from acts.test_utils.wifi import wifi_power_test_utils as wputils from acts.test_utils.wifi import wifi_retail_ap as retail_ap @@ -30,29 +32,45 @@ from acts.test_utils.wifi import wifi_test_utils as wutils class WifiRvrTest(base_test.BaseTestClass): + """Class to test WiFi rate versus range. + + This class implements WiFi rate versus range tests on single AP single STA + links. The class setups up the AP in the desired configurations, configures + and connects the phone to the AP, and runs iperf throughput test while + sweeping attenuation. For an example config file to run this test class see + example_connectivity_performance_ap_sta.json. + """ + TEST_TIMEOUT = 10 SHORT_SLEEP = 1 MED_SLEEP = 5 + MAX_CONSECUTIVE_ZEROS = 5 def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) + self.failure_count_metric = BlackboxMetricLogger.for_test_case( + metric_name='failure_count') def setup_class(self): + """Initializes common test hardware and parameters. + + This function initializes hardwares and compiles parameters that are + common to all tests in this class. + """ self.client_dut = self.android_devices[-1] - req_params = ["rvr_test_params", "testbed_params"] - opt_params = [ - "main_network", "RetailAccessPoints", "golden_files_list" + req_params = [ + "RetailAccessPoints", "rvr_test_params", "testbed_params" ] + opt_params = ["main_network", "golden_files_list"] self.unpack_userparams(req_params, opt_params) - self.test_params = self.rvr_test_params + self.testclass_params = self.rvr_test_params self.num_atten = self.attenuators[0].instrument.num_atten self.iperf_server = self.iperf_servers[0] - if hasattr(self, "RetailAccessPoints"): - self.access_points = retail_ap.create(self.RetailAccessPoints) - self.access_point = self.access_points[0] - self.log.info("Access Point Configuration: {}".format( - self.access_point.ap_settings)) - self.log_path = os.path.join(logging.log_path, "rvr_results") + self.access_points = retail_ap.create(self.RetailAccessPoints) + self.access_point = self.access_points[0] + self.log.info("Access Point Configuration: {}".format( + self.access_point.ap_settings)) + self.log_path = os.path.join(logging.log_path, "results") utils.create_dir(self.log_path) if not hasattr(self, "golden_files_list"): self.golden_files_list = [ @@ -70,11 +88,13 @@ class WifiRvrTest(base_test.BaseTestClass): self.iperf_server.stop() def teardown_class(self): - """Saves plot with all test results to enable comparison. - """ # Turn WiFi OFF for dev in self.android_devices: wutils.wifi_toggle_state(dev, False) + self.process_testclass_results() + + def process_testclass_results(self): + """Saves plot with all test results to enable comparison.""" # Plot and save all results x_data = [] y_data = [] @@ -97,7 +117,7 @@ class WifiRvrTest(base_test.BaseTestClass): "linewidth": 3, "markersize": 10 } - output_file_path = "{}/{}.html".format(self.log_path, "rvr_results") + output_file_path = os.path.join(self.log_path, 'results.html') wputils.bokeh_plot( data_sets, legends, @@ -116,15 +136,8 @@ class WifiRvrTest(base_test.BaseTestClass): rvr_result: dict containing attenuation, throughput and other meta data """ - test_name = self.current_test_name - golden_path = [ - file_name for file_name in self.golden_files_list - if test_name in file_name - ] try: - golden_path = golden_path[0] - throughput_limits = self.compute_throughput_limits( - golden_path, rvr_result) + throughput_limits = self.compute_throughput_limits(rvr_result) except: asserts.fail("Test failed: Golden file not found") @@ -142,14 +155,15 @@ class WifiRvrTest(base_test.BaseTestClass): format(current_att, current_throughput, throughput_limits["lower_limit"][idx], throughput_limits["upper_limit"][idx])) - if failure_count >= self.test_params["failure_count_tolerance"]: + self.failure_count_metric.metric_value = failure_count + if failure_count >= self.testclass_params["failure_count_tolerance"]: asserts.fail("Test failed. Found {} points outside limits.".format( failure_count)) asserts.explicit_pass( "Test passed. Found {} points outside throughput limits.".format( failure_count)) - def compute_throughput_limits(self, golden_path, rvr_result): + def compute_throughput_limits(self, rvr_result): """Compute throughput limits for current test. Checks the RvR test result and compares to a throughput limites for @@ -157,12 +171,14 @@ class WifiRvrTest(base_test.BaseTestClass): config file. Args: - golden_path: path to golden file used to generate limits rvr_result: dict containing attenuation, throughput and other meta data Returns: throughput_limits: dict containing attenuation and throughput limit data """ + test_name = self.current_test_name + golden_path = next(file_name for file_name in self.golden_files_list + if test_name in file_name) with open(golden_path, 'r') as golden_file: golden_results = json.load(golden_file) golden_attenuation = [ @@ -190,12 +206,13 @@ class WifiRvrTest(base_test.BaseTestClass): attenuation.append(current_att) lower_limit.append( - max(closest_throughputs[0] - max( - self.test_params["abs_tolerance"], closest_throughputs[0] * - self.test_params["pct_tolerance"] / 100), 0)) + max(closest_throughputs[0] - + max(self.testclass_params["abs_tolerance"], + closest_throughputs[0] * + self.testclass_params["pct_tolerance"] / 100), 0)) upper_limit.append(closest_throughputs[-1] + max( - self.test_params["abs_tolerance"], closest_throughputs[-1] * - self.test_params["pct_tolerance"] / 100)) + self.testclass_params["abs_tolerance"], closest_throughputs[-1] + * self.testclass_params["pct_tolerance"] / 100)) throughput_limits = { "attenuation": attenuation, "lower_limit": lower_limit, @@ -203,7 +220,7 @@ class WifiRvrTest(base_test.BaseTestClass): } return throughput_limits - def post_process_results(self, rvr_result): + def process_test_results(self, rvr_result): """Saves plots and JSON formatted results. Args: @@ -233,11 +250,9 @@ class WifiRvrTest(base_test.BaseTestClass): "markersize": 10 } try: - golden_path = [ - file_name for file_name in self.golden_files_list - if test_name in file_name - ] - golden_path = golden_path[0] + golden_path = next(file_name + for file_name in self.golden_files_list + if test_name in file_name) with open(golden_path, 'r') as golden_file: golden_results = json.load(golden_file) legends.insert(0, "Golden Results") @@ -247,8 +262,7 @@ class WifiRvrTest(base_test.BaseTestClass): ] data_sets[0].insert(0, golden_attenuation) data_sets[1].insert(0, golden_results["throughput_receive"]) - throughput_limits = self.compute_throughput_limits( - golden_path, rvr_result) + throughput_limits = self.compute_throughput_limits(rvr_result) shaded_region = { "x_vector": throughput_limits["attenuation"], "lower_limit": throughput_limits["lower_limit"], @@ -261,33 +275,34 @@ class WifiRvrTest(base_test.BaseTestClass): wputils.bokeh_plot(data_sets, legends, fig_property, shaded_region, output_file_path) - def rvr_test(self): + def run_rvr_test(self, testcase_params): """Test function to run RvR. The function runs an RvR test in the current device/AP configuration. Function is called from another wrapper function that sets up the testbed for the RvR test + Args: + testcase_params: dict containing test-specific parameters Returns: rvr_result: dict containing rvr_results and meta data """ self.log.info("Start running RvR") - rvr_result = [] - for atten in self.rvr_atten_range: + zero_counter = 0 + throughput = [] + for atten in self.atten_range: # Set Attenuation self.log.info("Setting attenuation to {} dB".format(atten)) - [ - self.attenuators[i].set_atten(atten) - for i in range(self.num_atten) - ] + for attenuator in self.attenuators: + attenuator.set_atten(atten) # Start iperf session self.iperf_server.start(tag=str(atten)) try: client_output = "" client_status, client_output = self.client_dut.run_iperf_client( self.testbed_params["iperf_server_address"], - self.iperf_args, - timeout=self.test_params["iperf_duration"] + + testcase_params["iperf_args"], + timeout=self.testclass_params["iperf_duration"] + self.TEST_TIMEOUT) except: self.log.warning("TimeoutError: Iperf measurement timed out.") @@ -298,80 +313,125 @@ class WifiRvrTest(base_test.BaseTestClass): out_file.write("\n".join(client_output)) self.iperf_server.stop() # Parse and log result - if self.use_client_output: + if testcase_params["use_client_output"]: iperf_file = client_output_path else: iperf_file = self.iperf_server.log_files[-1] try: iperf_result = ipf.IPerfResult(iperf_file) curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ - self.test_params["iperf_ignored_interval"]:-1]) / len( - iperf_result.instantaneous_rates[self.test_params[ + self.testclass_params["iperf_ignored_interval"]:-1]) / len( + iperf_result.instantaneous_rates[self.testclass_params[ "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) except: self.log.warning( "ValueError: Cannot get iperf result. Setting to 0") curr_throughput = 0 - rvr_result.append(curr_throughput) + throughput.append(curr_throughput) self.log.info("Throughput at {0:.2f} dB is {1:.2f} Mbps".format( atten, curr_throughput)) - [self.attenuators[i].set_atten(0) for i in range(self.num_atten)] + if curr_throughput == 0: + zero_counter = zero_counter + 1 + else: + zero_counter = 0 + if zero_counter == self.MAX_CONSECUTIVE_ZEROS: + self.log.info( + "Throughput stable at 0 Mbps. Stopping test now.") + throughput.extend([0] * + (len(self.atten_range) - len(throughput))) + break + for attenuator in self.attenuators: + attenuator.set_atten(0) + # Compile test result and meta data + rvr_result = collections.OrderedDict() + rvr_result["test_name"] = self.current_test_name + rvr_result["ap_settings"] = self.access_point.ap_settings.copy() + rvr_result["fixed_attenuation"] = self.testbed_params[ + "fixed_attenuation"][str(testcase_params["channel"])] + rvr_result["attenuation"] = list(self.atten_range) + rvr_result["throughput_receive"] = throughput return rvr_result - def rvr_test_func(self, channel, mode): - """Main function to test RvR. - - The function sets up the AP in the correct channel and mode - configuration and called run_rvr to sweep attenuation and measure - throughput + def setup_ap(self, testcase_params): + """Sets up the access point in the configuration required by the test. Args: - channel: Specifies AP's channel - mode: Specifies AP's bandwidth/mode (11g, VHT20, VHT40, VHT80) - Returns: - rvr_result: dict containing rvr_results and meta data + testcase_params: dict containing AP and other test params """ - #Initialize RvR test parameters - num_atten_steps = int((self.test_params["rvr_atten_stop"] - - self.test_params["rvr_atten_start"]) / - self.test_params["rvr_atten_step"]) - self.rvr_atten_range = [ - self.test_params["rvr_atten_start"] + - x * self.test_params["rvr_atten_step"] - for x in range(0, num_atten_steps) - ] - rvr_result = {} - # Configure AP - band = self.access_point.band_lookup_by_channel(channel) + band = self.access_point.band_lookup_by_channel( + testcase_params["channel"]) if "2G" in band: - frequency = wutils.WifiEnums.channel_2G_to_freq[channel] + frequency = wutils.WifiEnums.channel_2G_to_freq[testcase_params[ + "channel"]] else: - frequency = wutils.WifiEnums.channel_5G_to_freq[channel] + frequency = wutils.WifiEnums.channel_5G_to_freq[testcase_params[ + "channel"]] if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: self.access_point.set_region(self.testbed_params["DFS_region"]) else: self.access_point.set_region(self.testbed_params["default_region"]) - self.access_point.set_channel(band, channel) - self.access_point.set_bandwidth(band, mode) + self.access_point.set_channel(band, testcase_params["channel"]) + self.access_point.set_bandwidth(band, testcase_params["mode"]) self.log.info("Access Point Configuration: {}".format( self.access_point.ap_settings)) - # Set attenuator to 0 dB - [self.attenuators[i].set_atten(0) for i in range(self.num_atten)] - # Connect DUT to Network + + def setup_dut(self, testcase_params): + """Sets up the DUT in the configuration required by the test. + + Args: + testcase_params: dict containing AP and other test params + """ + band = self.access_point.band_lookup_by_channel( + testcase_params["channel"]) wutils.reset_wifi(self.client_dut) - self.main_network[band]["channel"] = channel + self.client_dut.droid.wifiSetCountryCode( + self.testclass_params["country_code"]) + self.main_network[band]["channel"] = testcase_params["channel"] wutils.wifi_connect( self.client_dut, self.main_network[band], num_of_tries=5) time.sleep(self.MED_SLEEP) - # Run RvR and log result - rvr_result["test_name"] = self.current_test_name - rvr_result["ap_settings"] = self.access_point.ap_settings.copy() - rvr_result["attenuation"] = list(self.rvr_atten_range) - rvr_result["fixed_attenuation"] = self.testbed_params[ - "fixed_attenuation"][str(channel)] - rvr_result["throughput_receive"] = self.rvr_test() - self.testclass_results.append(rvr_result) - return rvr_result + + def setup_rvr_test(self, testcase_params): + """Function that gets devices ready for the test. + + Args: + testcase_params: dict containing test-specific parameters + """ + #Initialize RvR test parameters + num_atten_steps = int( + (testcase_params["atten_stop"] - testcase_params["atten_start"]) / + testcase_params["atten_step"]) + self.atten_range = [ + testcase_params["atten_start"] + x * testcase_params["atten_step"] + for x in range(0, num_atten_steps) + ] + # Configure AP + self.setup_ap(testcase_params) + # Set attenuator to 0 dB + for attenuator in self.attenuators: + attenuator.set_atten(0) + # Resest, configure, and connect DUT + self.setup_dut(testcase_params) + + def parse_test_params(self, test_name): + """Function that generates test params based on the test name.""" + test_name_params = test_name.split("_") + testcase_params = collections.OrderedDict() + testcase_params["channel"] = int(test_name_params[4][2:]) + testcase_params["mode"] = test_name_params[5] + testcase_params["iperf_args"] = '-i 1 -t {} -J '.format( + self.testclass_params["iperf_duration"]) + if test_name_params[2] == "UDP": + testcase_params[ + "iperf_args"] = testcase_params["iperf_args"] + "-u -b {}".format( + self.testclass_params["UDP_rates"][testcase_params["mode"]]) + if test_name_params[3] == "DL": + testcase_params[ + "iperf_args"] = testcase_params["iperf_args"] + ' -R' + testcase_params["use_client_output"] = True + else: + testcase_params["use_client_output"] = False + return testcase_params def _test_rvr(self): """ Function that gets called for each test case @@ -379,21 +439,17 @@ class WifiRvrTest(base_test.BaseTestClass): The function gets called in each rvr test case. The function customizes the rvr test based on the test name of the test that called it """ - test_params = self.current_test_name.split("_") - channel = int(test_params[4][2:]) - mode = test_params[5] - self.iperf_args = '-i 1 -t {} -J '.format( - self.test_params["iperf_duration"]) - if test_params[2] == "UDP": - self.iperf_args = self.iperf_args + "-u -b {}".format( - self.test_params["UDP_rates"][mode]) - if test_params[3] == "DL": - self.iperf_args = self.iperf_args + ' -R' - self.use_client_output = True - else: - self.use_client_output = False - rvr_result = self.rvr_test_func(channel, mode) - self.post_process_results(rvr_result) + # Compile test parameters from config and test name + testcase_params = self.parse_test_params(self.current_test_name) + testcase_params.update(self.testclass_params) + + # Prepare devices and run test + self.setup_rvr_test(testcase_params) + rvr_result = self.run_rvr_test(testcase_params) + + # Post-process results + self.testclass_results.append(rvr_result) + self.process_test_results(rvr_result) self.pass_fail_check(rvr_result) #Test cases @@ -705,7 +761,7 @@ class WifiRvrTest(base_test.BaseTestClass): # Classes defining test suites class WifiRvr_2GHz_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ("test_rvr_TCP_DL_ch1_VHT20", "test_rvr_TCP_UL_ch1_VHT20", "test_rvr_TCP_DL_ch6_VHT20", "test_rvr_TCP_UL_ch6_VHT20", "test_rvr_TCP_DL_ch11_VHT20", @@ -714,7 +770,7 @@ class WifiRvr_2GHz_Test(WifiRvrTest): class WifiRvr_UNII1_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_DL_ch36_VHT20", "test_rvr_TCP_UL_ch36_VHT20", "test_rvr_TCP_DL_ch36_VHT40", "test_rvr_TCP_UL_ch36_VHT40", @@ -727,7 +783,7 @@ class WifiRvr_UNII1_Test(WifiRvrTest): class WifiRvr_UNII3_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_DL_ch149_VHT20", "test_rvr_TCP_UL_ch149_VHT20", "test_rvr_TCP_DL_ch149_VHT40", "test_rvr_TCP_UL_ch149_VHT40", @@ -740,7 +796,7 @@ class WifiRvr_UNII3_Test(WifiRvrTest): class WifiRvr_SampleDFS_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_DL_ch64_VHT20", "test_rvr_TCP_UL_ch64_VHT20", "test_rvr_TCP_DL_ch100_VHT20", "test_rvr_TCP_UL_ch100_VHT20", @@ -753,7 +809,7 @@ class WifiRvr_SampleDFS_Test(WifiRvrTest): class WifiRvr_SampleUDP_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_UDP_DL_ch6_VHT20", "test_rvr_UDP_UL_ch6_VHT20", "test_rvr_UDP_DL_ch36_VHT20", "test_rvr_UDP_UL_ch36_VHT20", @@ -766,7 +822,7 @@ class WifiRvr_SampleUDP_Test(WifiRvrTest): class WifiRvr_TCP_All_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_DL_ch1_VHT20", "test_rvr_TCP_UL_ch1_VHT20", "test_rvr_TCP_DL_ch6_VHT20", "test_rvr_TCP_UL_ch6_VHT20", @@ -789,7 +845,7 @@ class WifiRvr_TCP_All_Test(WifiRvrTest): class WifiRvr_TCP_Downlink_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_DL_ch1_VHT20", "test_rvr_TCP_DL_ch6_VHT20", "test_rvr_TCP_DL_ch11_VHT20", "test_rvr_TCP_DL_ch36_VHT20", @@ -804,7 +860,7 @@ class WifiRvr_TCP_Downlink_Test(WifiRvrTest): class WifiRvr_TCP_Uplink_Test(WifiRvrTest): def __init__(self, controllers): - base_test.BaseTestClass.__init__(self, controllers) + super().__init__(controllers) self.tests = ( "test_rvr_TCP_UL_ch1_VHT20", "test_rvr_TCP_UL_ch6_VHT20", "test_rvr_TCP_UL_ch11_VHT20", "test_rvr_TCP_UL_ch36_VHT20", diff --git a/acts/tests/google/wifi/WifiSensitivityTest.py b/acts/tests/google/wifi/WifiSensitivityTest.py new file mode 100644 index 0000000000..31ec8dc5f3 --- /dev/null +++ b/acts/tests/google/wifi/WifiSensitivityTest.py @@ -0,0 +1,412 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2017 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections +import json +import logging +import os +import WifiRvrTest +from acts import asserts +from acts import base_test +from acts import utils +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi import wifi_test_utils as wutils +from acts.test_utils.wifi import wifi_retail_ap as retail_ap + + +class WifiSensitivityTest(WifiRvrTest.WifiRvrTest): + """Class to test WiFi sensitivity tests. + + This class implements measures WiFi sensitivity per rate. It heavily + leverages the WifiRvrTest class and introduced minor differences to set + specific rates and the access point, and implements a different pass/fail + check. For an example config file to run this test class see + example_connectivity_performance_ap_sta.json. + """ + + VALID_TEST_CONFIGS = { + 1: ["legacy", "VHT20"], + 2: ["legacy", "VHT20"], + 6: ["legacy", "VHT20"], + 10: ["legacy", "VHT20"], + 11: ["legacy", "VHT20"], + 36: ["legacy", "VHT20"], + 40: ["legacy", "VHT20"], + 44: ["legacy", "VHT20"], + 48: ["legacy", "VHT20"], + 149: ["legacy", "VHT20"], + 153: ["legacy", "VHT20"], + 157: ["legacy", "VHT20"], + 161: ["legacy", "VHT20"] + } + VALID_RATES = { + "legacy_2GHz": [[54, 1], [48, 1], [36, 1], [24, 1], [18, 1], [12, 1], + [11, 1], [9, 1], [6, 1], [5.5, 1], [2, 1], [1, 1]], + "legacy_5GHz": [[54, 1], [48, 1], [36, 1], [24, 1], [18, 1], [12, 1], + [9, 1], [6, 1]], + "HT": [[8, 1], [7, 1], [6, 1], [5, 1], [4, 1], [3, 1], [2, 1], [1, 1], + [0, 1], [15, 2], [14, 2], [13, 2], [12, 2], [11, 2], [10, 2], + [9, 2], [8, 2]], + "VHT": [[9, 1], [8, 1], [7, 1], [6, 1], [5, 1], [4, 1], [3, 1], [2, 1], + [1, 1], [0, 1], [9, 2], [8, 2], [7, 2], [6, 2], [5, 2], [4, 2], + [3, 2], [2, 2], [1, 2], [0, 2]] + } + + def setup_class(self): + """Initializes common test hardware and parameters. + + This function initializes hardwares and compiles parameters that are + common to all tests in this class. + """ + self.client_dut = self.android_devices[-1] + req_params = [ + "RetailAccessPoints", "sensitivity_test_params", "testbed_params" + ] + opt_params = ["main_network", "golden_files_list"] + self.unpack_userparams(req_params, opt_params) + self.testclass_params = self.sensitivity_test_params + self.num_atten = self.attenuators[0].instrument.num_atten + self.iperf_server = self.iperf_servers[0] + self.access_points = retail_ap.create(self.RetailAccessPoints) + self.access_point = self.access_points[0] + self.log.info("Access Point Configuration: {}".format( + self.access_point.ap_settings)) + self.log_path = os.path.join(logging.log_path, "results") + utils.create_dir(self.log_path) + if not hasattr(self, "golden_files_list"): + self.golden_files_list = [ + os.path.join(self.testbed_params["golden_results_path"], + file) for file in os.listdir( + self.testbed_params["golden_results_path"]) + ] + self.testclass_results = [] + + # Turn WiFi ON + for dev in self.android_devices: + wutils.wifi_toggle_state(dev, True) + + def pass_fail_check(self, rvr_result): + """Checks sensitivity against golden results and decides on pass/fail. + + Args: + rvr_result: dict containing attenuation, throughput and other meta + data + """ + try: + golden_path = next(file_name + for file_name in self.golden_files_list + if "sensitivity_targets" in file_name) + with open(golden_path, 'r') as golden_file: + golden_results = json.load(golden_file) + except: + asserts.fail("Test failed: Golden file not found") + + golden_sensitivity = golden_results[self.current_test_name][ + "sensitivity"] + result_string = "Througput = {}, Sensitivity = {}. Target Sensitivity = {}".format( + rvr_result["peak_throughput"], rvr_result["sensitivity"], + golden_sensitivity) + if rvr_result["sensitivity"] - golden_sensitivity > self.testclass_params["sensitivity_tolerance"]: + asserts.fail("Test Failed. {}".format(result_string)) + else: + asserts.explicit_pass("Test Passed. {}".format(result_string)) + + def process_testclass_results(self): + """Saves and plots test results from all executed test cases.""" + testclass_results_dict = collections.OrderedDict() + for result in self.testclass_results: + testclass_results_dict[result["test_name"]] = { + "peak_throughput": result["peak_throughput"], + "range": result["range"], + "sensitivity": result["sensitivity"] + } + results_file_path = os.path.join(self.log_path, 'results.json') + with open(results_file_path, 'w') as results_file: + json.dump(testclass_results_dict, results_file, indent=4) + super().process_testclass_results() + + def process_test_results(self, rvr_result): + """Post processes RvR results to compute sensitivity. + + Takes in the results of the RvR tests and computes the sensitivity of + the current rate by looking at the point at which throughput drops + below the percentage specified in the config file. The function then + calls on its parent class process_test_results to plot the result. + + Args: + rvr_result: dict containing attenuation, throughput and other meta + data + """ + rvr_result["peak_throughput"] = max(rvr_result["throughput_receive"]) + throughput_check = [ + throughput < rvr_result["peak_throughput"] * + (self.testclass_params["throughput_pct_at_sensitivity"] / 100) + for throughput in rvr_result["throughput_receive"] + ] + consistency_check = [ + idx for idx in range(len(throughput_check)) + if all(throughput_check[idx:]) + ] + rvr_result["atten_at_range"] = rvr_result["attenuation"][ + consistency_check[0] - 1] + rvr_result["range"] = rvr_result["fixed_attenuation"] + ( + rvr_result["atten_at_range"]) + rvr_result["sensitivity"] = self.testclass_params["ap_tx_power"] - ( + rvr_result["range"]) + super().process_test_results(rvr_result) + + def setup_ap(self, testcase_params): + """Sets up the access point in the configuration required by the test. + + Args: + testcase_params: dict containing AP and other test params + """ + band = self.access_point.band_lookup_by_channel( + testcase_params["channel"]) + if "2G" in band: + frequency = wutils.WifiEnums.channel_2G_to_freq[testcase_params[ + "channel"]] + else: + frequency = wutils.WifiEnums.channel_5G_to_freq[testcase_params[ + "channel"]] + if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: + self.access_point.set_region(self.testbed_params["DFS_region"]) + else: + self.access_point.set_region(self.testbed_params["default_region"]) + self.access_point.set_channel(band, testcase_params["channel"]) + self.access_point.set_bandwidth(band, testcase_params["mode"]) + self.access_point.set_power(band, testcase_params["ap_tx_power"]) + self.access_point.set_rate( + band, testcase_params["mode"], testcase_params["num_streams"], + testcase_params["rate"], testcase_params["short_gi"]) + self.log.info("Access Point Configuration: {}".format( + self.access_point.ap_settings)) + + def get_start_atten(self): + """Gets the starting attenuation for this sensitivity test. + + The function gets the starting attenuation by checking whether a test + as the next higher MCS has been executed. If so it sets the starting + point a configurable number of dBs below the next MCS's sensitivity. + + Returns: + start_atten: starting attenuation for current test + """ + # Get the current and reference test config. The reference test is the + # one performed at the current MCS+1 + current_test_params = self.parse_test_params(self.current_test_name) + ref_test_params = current_test_params.copy() + if "legacy" in current_test_params["mode"] and current_test_params["rate"] < 54: + if current_test_params["channel"] <= 13: + ref_index = self.VALID_RATES["legacy_2GHz"].index( + [current_test_params["rate"], 1]) - 1 + ref_test_params["rate"] = self.VALID_RATES["legacy_2GHz"][ + ref_index][0] + else: + ref_index = self.VALID_RATES["legacy_5GHz"].index( + [current_test_params["rate"], 1]) - 1 + ref_test_params["rate"] = self.VALID_RATES["legacy_5GHz"][ + ref_index][0] + else: + ref_test_params["rate"] = ref_test_params["rate"] + 1 + + # Check if reference test has been run and set attenuation accordingly + previous_params = [ + self.parse_test_params(result["test_name"]) + for result in self.testclass_results + ] + try: + ref_index = previous_params.index(ref_test_params) + start_atten = self.testclass_results[ref_index]["atten_at_range"] - ( + self.testclass_params["adjacent_mcs_range_gap"]) + except: + print("Reference test not found. Starting from {} dB".format( + self.testclass_params["atten_start"])) + start_atten = self.testclass_params["atten_start"] + return start_atten + + def parse_test_params(self, test_name): + """Function that generates test params based on the test name.""" + test_name_params = test_name.split("_") + testcase_params = collections.OrderedDict() + testcase_params["channel"] = int(test_name_params[2][2:]) + testcase_params["mode"] = test_name_params[3] + if "legacy" in testcase_params["mode"].lower(): + testcase_params["rate"] = float( + str(test_name_params[4]).replace("p", ".")) + else: + testcase_params["rate"] = int(test_name_params[4][3:]) + testcase_params["num_streams"] = int(test_name_params[5][3:]) + testcase_params["short_gi"] = 0 + if self.testclass_params["traffic_type"] == "UDP": + testcase_params["iperf_args"] = '-i 1 -t {} -J -u -b {} -R'.format( + self.testclass_params["iperf_duration"], + self.testclass_params["UDP_rates"][testcase_params["mode"]]) + else: + testcase_params["iperf_args"] = '-i 1 -t {} -J -R'.format( + self.testclass_params["iperf_duration"]) + testcase_params["use_client_output"] = True + return testcase_params + + def _test_sensitivity(self): + """ Function that gets called for each test case + + The function gets called in each rvr test case. The function customizes + the rvr test based on the test name of the test that called it + """ + # Compile test parameters from config and test name + testcase_params = self.parse_test_params(self.current_test_name) + testcase_params.update(self.testclass_params) + testcase_params["atten_start"] = self.get_start_atten() + + # Prepare devices and run test + self.setup_rvr_test(testcase_params) + rvr_result = self.run_rvr_test(testcase_params) + + # Post-process results + self.process_test_results(rvr_result) + self.testclass_results.append(rvr_result) + self.pass_fail_check(rvr_result) + # Add results to testclass_results + + def generate_test_cases(self, channels): + """Function that auto-generates test cases for a test class.""" + testcase_wrapper = self._test_sensitivity + for channel in channels: + for mode in self.VALID_TEST_CONFIGS[channel]: + if "VHT" in mode: + rates = self.VALID_RATES["VHT"] + elif "HT" in mode: + rates = self.VALID_RATES["HT"] + elif "legacy" in mode and channel < 14: + rates = self.VALID_RATES["legacy_2GHz"] + elif "legacy" in mode and channel > 14: + rates = self.VALID_RATES["legacy_5GHz"] + else: + raise ValueError("Invalid test mode.") + for rate in rates: + if "legacy" in mode: + testcase_name = "test_sensitivity_ch{}_{}_{}_nss{}".format( + channel, mode, + str(rate[0]).replace(".", "p"), rate[1]) + else: + testcase_name = "test_sensitivity_ch{}_{}_mcs{}_nss{}".format( + channel, mode, rate[0], rate[1]) + setattr(self, testcase_name, testcase_wrapper) + self.tests.append(testcase_name) + + +class WifiSensitivity_AllChannels_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases( + [1, 2, 6, 10, 11, 36, 40, 44, 48, 149, 153, 157, 161]) + + +class WifiSensitivity_2GHz_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([1, 2, 6, 10, 11]) + + +class WifiSensitivity_UNII1_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([36, 40, 44, 48]) + + +class WifiSensitivity_UNII3_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([149, 153, 157, 161]) + + +class WifiSensitivity_ch1_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([1]) + + +class WifiSensitivity_ch2_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([2]) + + +class WifiSensitivity_ch6_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([6]) + + +class WifiSensitivity_ch10_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([10]) + + +class WifiSensitivity_ch11_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([11]) + + +class WifiSensitivity_ch36_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([36]) + + +class WifiSensitivity_ch40_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([40]) + + +class WifiSensitivity_ch44_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([44]) + + +class WifiSensitivity_ch48_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([48]) + + +class WifiSensitivity_ch149_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([149]) + + +class WifiSensitivity_ch153_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([153]) + + +class WifiSensitivity_ch157_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([157]) + + +class WifiSensitivity_ch161_Test(WifiSensitivityTest): + def __init__(self, controllers): + base_test.BaseTestClass.__init__(self, controllers) + self.generate_test_cases([161]) diff --git a/acts/tests/google/wifi/WifiSoftApTest.py b/acts/tests/google/wifi/WifiSoftApTest.py index 0d722e3030..1ef8e93a33 100644 --- a/acts/tests/google/wifi/WifiSoftApTest.py +++ b/acts/tests/google/wifi/WifiSoftApTest.py @@ -40,7 +40,7 @@ class WifiSoftApTest(WifiBaseTest): """ self.dut = self.android_devices[0] self.dut_client = self.android_devices[1] - req_params = [] + req_params = ["dbs_supported_models"] opt_param = ["open_network"] self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_param) @@ -65,6 +65,16 @@ class WifiSoftApTest(WifiBaseTest): self.dut_client.droid.wifiEnableVerboseLogging(1) asserts.assert_equal(self.dut_client.droid.wifiGetVerboseLoggingLevel(), 1, "Failed to enable WiFi verbose logging on the client dut.") + wutils.wifi_toggle_state(self.dut_client, True) + if len(self.android_devices) > 2: + utils.sync_device_time(self.android_devices[2]) + self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + self.android_devices[2].droid.wifiEnableVerboseLogging(1) + asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1, + "Failed to enable WiFi verbose logging on the client dut.") + # Disable wifi for dbs supported models + if self.dut.model in self.dbs_supported_models: + wutils.wifi_toggle_state(self.dut, False) def teardown_class(self): wutils.stop_wifi_tethering(self.dut) @@ -74,6 +84,10 @@ class WifiSoftApTest(WifiBaseTest): del self.user_params["reference_networks"] del self.user_params["open_network"] + def teardown_test(self): + if self.dut.droid.wifiIsApEnabled(): + wutils.stop_wifi_tethering(self.dut) + def on_fail(self, test_name, begin_time): self.dut.take_bug_report(test_name, begin_time) self.dut_client.take_bug_report(test_name, begin_time) @@ -121,7 +135,8 @@ class WifiSoftApTest(WifiBaseTest): asserts.assert_true(self.dut.droid.telephonyIsDataEnabled(), "Failed to enable cell data for softap dut.") - def validate_full_tether_startup(self, band=None, hidden=None): + def validate_full_tether_startup(self, band=None, hidden=None, + test_ping=False, test_clients=None): """Test full startup of wifi tethering 1. Report current state. @@ -151,6 +166,11 @@ class WifiSoftApTest(WifiBaseTest): asserts.assert_true(ret != -1, "Add network %r failed" % config) self.dut_client.droid.wifiEnableNetwork(ret, 0) self.confirm_softap_in_scan_results(config[wutils.WifiEnums.SSID_KEY]) + if test_ping: + self.validate_ping_between_softap_and_client(config) + if test_clients: + if len(self.android_devices) > 2: + self.validate_ping_between_two_clients(config) wutils.stop_wifi_tethering(self.dut) asserts.assert_false(self.dut.droid.wifiIsApEnabled(), "SoftAp is still reported as running") @@ -159,6 +179,61 @@ class WifiSoftApTest(WifiBaseTest): elif self.dut.droid.wifiCheckState(): asserts.fail("Wifi was disabled before softap and now it is enabled") + def validate_ping_between_softap_and_client(self, config): + """Test ping between softap and its client. + + Connect one android device to the wifi hotspot. + Verify they can ping each other. + + Args: + config: wifi network config with SSID, password + """ + wutils.wifi_connect(self.dut_client, config, check_connectivity=False) + + dut_ip = self.dut.droid.connectivityGetIPv4Addresses("wlan0")[0] + dut_client_ip = self.dut_client.droid.connectivityGetIPv4Addresses("wlan0")[0] + + self.dut.log.info("Try to ping %s" % dut_client_ip) + asserts.assert_true( + utils.adb_shell_ping(self.dut, count=10, dest_ip=dut_client_ip, timeout=20), + "%s ping %s failed" % (self.dut.serial, dut_client_ip)) + + self.dut_client.log.info("Try to ping %s" % dut_ip) + asserts.assert_true( + utils.adb_shell_ping(self.dut_client, count=10, dest_ip=dut_ip, timeout=20), + "%s ping %s failed" % (self.dut_client.serial, dut_ip)) + + wutils.stop_wifi_tethering(self.dut) + + def validate_ping_between_two_clients(self, config): + """Test ping between softap's clients. + + Connect two android device to the wifi hotspot. + Verify the clients can ping each other. + + Args: + config: wifi network config with SSID, password + """ + # Connect DUT to Network + ad1 = self.dut_client + ad2 = self.android_devices[2] + + wutils.wifi_connect(ad1, config, check_connectivity=False) + wutils.wifi_connect(ad2, config, check_connectivity=False) + ad1_ip = ad1.droid.connectivityGetIPv4Addresses('wlan0')[0] + ad2_ip = ad2.droid.connectivityGetIPv4Addresses('wlan0')[0] + + # Ping each other + ad1.log.info("Try to ping %s" % ad2_ip) + asserts.assert_true( + utils.adb_shell_ping(ad1, count=10, dest_ip=ad2_ip, timeout=20), + "%s ping %s failed" % (ad1.serial, ad2_ip)) + + ad2.log.info("Try to ping %s" % ad1_ip) + asserts.assert_true( + utils.adb_shell_ping(ad2, count=10, dest_ip=ad1_ip, timeout=20), + "%s ping %s failed" % (ad2.serial, ad1_ip)) + """ Tests Begin """ @test_tracker_info(uuid="495f1252-e440-461c-87a7-2c45f369e129") @@ -289,6 +364,52 @@ class WifiSoftApTest(WifiBaseTest): wutils.stop_wifi_tethering(self.dut) wutils.wait_for_disconnect(self.dut_client) + @test_tracker_info(uuid="883dd5b1-50c6-4958-a50f-bb4bea77ccaf") + def test_full_tether_startup_2G_one_client_ping_softap(self): + """(AP) 1 Device can connect to 2G hotspot + + Steps: + 1. Turn on DUT's 2G softap + 2. Client connects to the softap + 3. Client and DUT ping each other + """ + self.validate_full_tether_startup(WIFI_CONFIG_APBAND_2G, test_ping=True) + + @test_tracker_info(uuid="6604e848-99d6-422c-9fdc-2882642438b6") + def test_full_tether_startup_5G_one_client_ping_softap(self): + """(AP) 1 Device can connect to 5G hotspot + + Steps: + 1. Turn on DUT's 5G softap + 2. Client connects to the softap + 3. Client and DUT ping each other + """ + self.validate_full_tether_startup(WIFI_CONFIG_APBAND_5G, test_ping=True) + + @test_tracker_info(uuid="17725ecd-f900-4cf7-8b2d-d7515b0a595c") + def test_softap_2G_two_clients_ping_each_other(self): + """Test for 2G hotspot with 2 clients + + 1. Turn on 2G hotspot + 2. Two clients connect to the hotspot + 3. Two clients ping each other + """ + asserts.skip_if(len(self.android_devices) < 3, + "No extra android devices. Skip test") + self.validate_full_tether_startup(WIFI_CONFIG_APBAND_2G, test_clients=True) + + @test_tracker_info(uuid="98c09888-1021-4f79-9065-b3cf9b132146") + def test_softap_5G_two_clients_ping_each_other(self): + """Test for 5G hotspot with 2 clients + + 1. Turn on 5G hotspot + 2. Two clients connect to the hotspot + 3. Two clients ping each other + """ + asserts.skip_if(len(self.android_devices) < 3, + "No extra android devices. Skip test") + self.validate_full_tether_startup(WIFI_CONFIG_APBAND_5G, test_clients=True) + """ Tests End """ diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py new file mode 100755 index 0000000000..bd78d2e4a5 --- /dev/null +++ b/acts/tests/google/wifi/WifiStaApConcurrencyStressTest.py @@ -0,0 +1,343 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import pprint + +from acts import asserts +from acts import signals +from acts import utils +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G +from WifiStaApConcurrencyTest import WifiStaApConcurrencyTest +import acts.test_utils.wifi.wifi_test_utils as wutils + +WifiEnums = wutils.WifiEnums + +# Channels to configure the AP for various test scenarios. +WIFI_NETWORK_AP_CHANNEL_2G = 1 +WIFI_NETWORK_AP_CHANNEL_5G = 36 +WIFI_NETWORK_AP_CHANNEL_5G_DFS = 132 + +class WifiStaApConcurrencyStressTest(WifiStaApConcurrencyTest): + """Stress tests for STA + AP concurrency scenarios. + + Test Bed Requirement: + * At least two Android devices (For AP) + * One Wi-Fi network visible to the device (for STA). + """ + + def __init__(self, controllers): + WifiStaApConcurrencyTest.__init__(self, controllers) + self.tests = ("test_stress_wifi_connection_2G_softap_2G", + "test_stress_wifi_connection_5G_softap_5G", + "test_stress_wifi_connection_5G_DFS_softap_5G", + "test_stress_wifi_connection_5G_softap_2G", + "test_stress_wifi_connection_5G_DFS_softap_2G", + "test_stress_wifi_connection_2G_softap_5G", + "test_stress_wifi_connection_5G_softap_2G_with_location_scan_on", + "test_stress_softap_2G_wifi_connection_2G", + "test_stress_softap_5G_wifi_connection_5G", + "test_stress_softap_5G_wifi_connection_5G_DFS", + "test_stress_softap_5G_wifi_connection_2G", + "test_stress_softap_2G_wifi_connection_5G", + "test_stress_softap_2G_wifi_connection_5G_DFS", + "test_stress_softap_5G_wifi_connection_2G_with_location_scan_on") + + def setup_class(self): + self.dut = self.android_devices[0] + self.dut_client = self.android_devices[1] + wutils.wifi_test_device_init(self.dut) + wutils.wifi_test_device_init(self.dut_client) + # Do a simple version of init - mainly just sync the time and enable + # verbose logging. This test will fail if the DUT has a sim and cell + # data is disabled. We would also like to test with phones in less + # constrained states (or add variations where we specifically + # constrain). + utils.require_sl4a((self.dut, self.dut_client)) + utils.sync_device_time(self.dut) + utils.sync_device_time(self.dut_client) + # Set country code explicitly to "US". + self.dut.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + self.dut_client.droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + # Enable verbose logging on the duts + self.dut.droid.wifiEnableVerboseLogging(1) + asserts.assert_equal(self.dut.droid.wifiGetVerboseLoggingLevel(), 1, + "Failed to enable WiFi verbose logging on the softap dut.") + self.dut_client.droid.wifiEnableVerboseLogging(1) + asserts.assert_equal(self.dut_client.droid.wifiGetVerboseLoggingLevel(), 1, + "Failed to enable WiFi verbose logging on the client dut.") + + req_params = ["AccessPoint", "dbs_supported_models", "stress_count"] + opt_param = ["iperf_server_address"] + self.unpack_userparams( + req_param_names=req_params, opt_param_names=opt_param) + + if self.dut.model not in self.dbs_supported_models: + asserts.skip( + ("Device %s does not support dual interfaces.") + % self.dut.model) + + if "iperf_server_address" in self.user_params: + self.iperf_server = self.iperf_servers[0] + if hasattr(self, 'iperf_server'): + self.iperf_server.start() + + # Set the client wifi state to on before the test begins. + wutils.wifi_toggle_state(self.dut_client, True) + + # Init extra devices + if len(self.android_devices) > 2: + wutils.wifi_test_device_init(self.android_devices[2]) + utils.sync_device_time(self.android_devices[2]) + self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + self.android_devices[2].droid.wifiEnableVerboseLogging(1) + asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1, + "Failed to enable WiFi verbose logging on the client dut.") + + """Helper Functions""" + + def verify_traffic_between_softap_clients(self): + ad1 = self.dut_client + ad2 = self.android_devices[2] + ad1_ip = ad1.droid.connectivityGetIPv4Addresses('wlan0')[0] + ad2_ip = ad2.droid.connectivityGetIPv4Addresses('wlan0')[0] + # Ping each other + asserts.assert_true( + utils.adb_shell_ping(ad1, count=10, dest_ip=ad2_ip, timeout=20), + "%s ping %s failed" % (ad1.serial, ad2_ip)) + asserts.assert_true( + utils.adb_shell_ping(ad2, count=10, dest_ip=ad1_ip, timeout=20), + "%s ping %s failed" % (ad2.serial, ad1_ip)) + + def verify_traffic_between_onhub_clients(self, interface): + ad1 = self.dut + ad2 = self.android_devices[2] + ad1_ip = ad1.droid.connectivityGetIPv4Addresses(interface)[0] + ad2_ip = ad2.droid.connectivityGetIPv4Addresses('wlan0')[0] + # Ping each other + asserts.assert_true( + utils.adb_shell_ping(ad1, count=10, dest_ip=ad2_ip, timeout=20), + "%s ping %s failed" % (ad1.serial, ad2_ip)) + asserts.assert_true( + utils.adb_shell_ping(ad2, count=10, dest_ip=ad1_ip, timeout=20), + "%s ping %s failed" % (ad2.serial, ad1_ip)) + + def verify_wifi_full_on_off(self, network, softap_config, interface): + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((network, self.dut)) + self.run_iperf_client((network, self.dut)) + self.run_iperf_client((softap_config, self.dut_client)) + if len(self.android_devices) > 2: + self.log.info("Testbed has extra android devices, do more validation") + self.verify_traffic_between_onhub_clients(interface) + wutils.wifi_toggle_state(self.dut, False) + + def verify_softap_full_on_off(self, network, softap_band): + softap_config = self.start_softap_and_verify(softap_band) + self.run_iperf_client((network, self.dut)) + self.run_iperf_client((softap_config, self.dut_client)) + if len(self.android_devices) > 2: + self.log.info("Testbed has extra android devices, do more validation") + self.verify_traffic_between_softap_clients() + wutils.stop_wifi_tethering(self.dut) + + """Tests""" + @test_tracker_info(uuid="615997cc-8290-4af3-b3ac-1f5bd5af6ed1") + def test_stress_wifi_connection_2G_softap_2G(self): + """Tests connection to 2G network the enable/disable SoftAp on 2G N times. + """ + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_2g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_2g, WIFI_CONFIG_APBAND_2G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="03362d54-a624-4fb8-ad97-7abb9e6f655c") + def test_stress_wifi_connection_5G_softap_5G(self): + """Tests connection to 5G network followed by bringing up SoftAp on 5G. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_5g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="fdda4ff2-38d5-4398-9a59-c7cee407a2b3") + def test_stress_wifi_connection_5G_DFS_softap_5G(self): + """Tests connection to 5G DFS network followed by bringing up SoftAp on 5G. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_5g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="b3621721-7714-43eb-8438-b578164b9194") + def test_stress_wifi_connection_5G_softap_2G(self): + """Tests connection to 5G network followed by bringing up SoftAp on 2G. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_5g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="bde1443f-f912-408e-b01a-537548dd023c") + def test_stress_wifi_connection_5G_DFS_softap_2G(self): + """Tests connection to 5G DFS network followed by bringing up SoftAp on 2G. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_5g, self.dut)) + for count in range(self.stress_count): + self.verify_softap_full_on_off(self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="2b6a891a-e0d6-4660-abf6-579099ce6924") + def test_stress_wifi_connection_2G_softap_5G(self): + """Tests connection to 2G network followed by bringing up SoftAp on 5G. + """ + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_2g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_2g, WIFI_CONFIG_APBAND_2G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="f28abf22-9df0-4500-b342-6682ca305e60") + def test_stress_wifi_connection_5G_softap_2G_with_location_scan_on(self): + """Tests connection to 5G network followed by bringing up SoftAp on 2G + with location scans turned on. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + self.turn_location_on_and_scan_toggle_on() + wutils.wifi_toggle_state(self.dut, True) + self.connect_to_wifi_network_and_verify((self.wpapsk_5g, self.dut)) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_softap_full_on_off(self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="0edb1500-6c60-442e-9268-a2ad9ee2b55c") + def test_stress_softap_2G_wifi_connection_2G(self): + """Tests enable SoftAp on 2G then connection/disconnection to 2G network for N times. + """ + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_2G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_2g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="162a6679-edd5-4daa-9f25-75d79cf4bb4a") + def test_stress_softap_5G_wifi_connection_5G(self): + """Tests enable SoftAp on 5G then connection/disconnection to 5G network for N times. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_5G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_5g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="ee98f2dd-c4f9-4f48-ab59-f577267760d5") + def test_stress_softap_5G_wifi_connection_5G_DFS(self): + """Tests enable SoftAp on 5G then connection/disconnection to 5G DFS network for N times. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_5G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_5g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="b50750b5-d5b9-4687-b9e7-9fb15f54b428") + def test_stress_softap_5G_wifi_connection_2G(self): + """Tests enable SoftAp on 5G then connection/disconnection to 2G network for N times. + """ + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_5G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_2g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="9a2865db-8e4b-4339-9999-000ce9b6970b") + def test_stress_softap_2G_wifi_connection_5G(self): + """Tests enable SoftAp on 2G then connection/disconnection to 5G network for N times. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_2G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_5g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="add6609d-91d6-4b89-94c5-0ad8b941e3d1") + def test_stress_softap_2G_wifi_connection_5G_DFS(self): + """Tests enable SoftAp on 2G then connection/disconnection to 5G DFS network for N times. + """ + self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_2G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_5g, softap_config, 'wlan1') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="ee42afb6-99d0-4330-933f-d4dd8c3626c6") + def test_stress_softap_5G_wifi_connection_2G_with_location_scan_on(self): + """Tests enable SoftAp on 5G then connection/disconnection to 2G network for N times + with location scans turned on. + """ + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) + self.turn_location_on_and_scan_toggle_on() + softap_config = self.start_softap_and_verify( + WIFI_CONFIG_APBAND_5G, check_connectivity=False) + for count in range(self.stress_count): + self.log.info("Iteration %d", count+1) + self.verify_wifi_full_on_off(self.wpapsk_2g, softap_config, 'wlan0') + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py index ffe0effb96..7f7b0a98b3 100755 --- a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py +++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py @@ -38,7 +38,7 @@ WIFI_NETWORK_AP_CHANNEL_5G = 36 WIFI_NETWORK_AP_CHANNEL_5G_DFS = 132 class WifiStaApConcurrencyTest(WifiBaseTest): - """Tests for STA + AP concurrency scenarions. + """Tests for STA + AP concurrency scenarios. Test Bed Requirement: * Two Android devices (For AP) @@ -90,18 +90,27 @@ class WifiStaApConcurrencyTest(WifiBaseTest): # Set the client wifi state to on before the test begins. wutils.wifi_toggle_state(self.dut_client, True) + if len(self.android_devices) > 2: + wutils.wifi_test_device_init(self.android_devices[2]) + utils.sync_device_time(self.android_devices[2]) + self.android_devices[2].droid.wifiSetCountryCode(wutils.WifiEnums.CountryCode.US) + self.android_devices[2].droid.wifiEnableVerboseLogging(1) + asserts.assert_equal(self.android_devices[2].droid.wifiGetVerboseLoggingLevel(), 1, + "Failed to enable WiFi verbose logging on the client dut.") + def setup_test(self): - self.dut.droid.wakeLockAcquireBright() - self.dut.droid.wakeUpNow() + for ad in self.android_devices: + ad.droid.wakeLockAcquireBright() + ad.droid.wakeUpNow() self.turn_location_off_and_scan_toggle_off() wutils.wifi_toggle_state(self.dut, False) def teardown_test(self): - self.dut.droid.wakeLockRelease() - self.dut.droid.goToSleepNow() wutils.stop_wifi_tethering(self.dut) - wutils.reset_wifi(self.dut) - wutils.reset_wifi(self.dut_client) + for ad in self.android_devices: + ad.droid.wakeLockRelease() + ad.droid.goToSleepNow() + wutils.reset_wifi(ad) self.access_points[0].close() del self.user_params["reference_networks"] del self.user_params["open_network"] @@ -111,8 +120,9 @@ class WifiStaApConcurrencyTest(WifiBaseTest): self.iperf_server.stop() def on_fail(self, test_name, begin_time): - self.dut.take_bug_report(test_name, begin_time) - self.dut.cat_adb_log(test_name, begin_time) + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + ad.cat_adb_log(test_name, begin_time) """Helper Functions""" def configure_ap(self, channel_2g=None, channel_5g=None): @@ -129,7 +139,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): self.legacy_configure_ap_and_start(channel_2g=channel_2g) else: self.legacy_configure_ap_and_start(channel_2g=channel_2g, - channel_5g=chanel_5g) + channel_5g=channel_5g) self.wpapsk_2g = self.reference_networks[0]["2g"] self.wpapsk_5g = self.reference_networks[0]["5g"] @@ -174,21 +184,32 @@ class WifiStaApConcurrencyTest(WifiBaseTest): params: A tuple of network info and AndroidDevice object. """ network, ad = params - droid = ad.droid - ed = ad.ed SSID = network[WifiEnums.SSID_KEY] wutils.start_wifi_connection_scan_and_ensure_network_found( - ad, SSID); + ad, SSID) wutils.wifi_connect(ad, network, num_of_tries=3) + if len(self.android_devices) > 2: + wutils.reset_wifi(self.android_devices[2]) + wutils.start_wifi_connection_scan_and_ensure_network_found( + self.android_devices[2], SSID) + wutils.wifi_connect(self.android_devices[2], network) - def confirm_softap_in_scan_results(self, ap_ssid): + def confirm_softap_can_be_connected(self, network, check_connectivity=True): """Confirm the ap started by wifi tethering is seen in scan results. Args: - ap_ssid: SSID of the ap we are looking for. + network: config of the ap we are looking for. """ + SSID = network[WifiEnums.SSID_KEY] wutils.start_wifi_connection_scan_and_ensure_network_found( - self.dut_client, ap_ssid); + self.dut_client, SSID) + wutils.wifi_connect(self.dut_client, network, check_connectivity=check_connectivity) + if len(self.android_devices) > 2: + wutils.reset_wifi(self.android_devices[2]) + wutils.start_wifi_connection_scan_and_ensure_network_found( + self.android_devices[2], SSID) + wutils.wifi_connect( + self.android_devices[2], network, check_connectivity=check_connectivity) def create_softap_config(self): """Create a softap config with ssid and password.""" @@ -199,21 +220,22 @@ class WifiStaApConcurrencyTest(WifiBaseTest): config[wutils.WifiEnums.PWD_KEY] = ap_password return config - def start_softap_and_verify(self, band): + def start_softap_and_verify(self, band, check_connectivity=True): """Test startup of softap - 1. Brinup AP mode. + 1. Bring up AP mode. 2. Verify SoftAP active using the client device. """ config = self.create_softap_config() wutils.start_wifi_tethering(self.dut, config[wutils.WifiEnums.SSID_KEY], config[wutils.WifiEnums.PWD_KEY], band) - self.confirm_softap_in_scan_results(config[wutils.WifiEnums.SSID_KEY]) + self.confirm_softap_can_be_connected(config, check_connectivity) + return config def connect_to_wifi_network_and_start_softap(self, nw_params, softap_band): - """Test concurrenct wifi connection and softap. - This helper method first makes a wifi conenction and then starts SoftAp. + """Test concurrent wifi connection and softap. + This helper method first makes a wifi connection and then starts SoftAp. Args: nw_params: Params for network STA connection. @@ -221,33 +243,62 @@ class WifiStaApConcurrencyTest(WifiBaseTest): 1. Bring up wifi. 2. Establish connection to a network. - 3. Bring up softap and verify AP is seen on a client device. - 4. Run iperf on the wifi connection to the network. + 3. Bring up softap and verify AP can be connected by a client device. + 4. Run iperf on the wifi/softap connection to the network. """ wutils.wifi_toggle_state(self.dut, True) self.connect_to_wifi_network_and_verify((nw_params, self.dut)) - self.start_softap_and_verify(softap_band) + softap_config = self.start_softap_and_verify(softap_band) self.run_iperf_client((nw_params, self.dut)) + self.run_iperf_client((softap_config, self.dut_client)) + if len(self.android_devices) > 2: + self.log.info("Testbed has extra android devices, do more validation") + ad1 = self.dut_client + ad2 = self.android_devices[2] + ad1_ip = ad1.droid.connectivityGetIPv4Addresses('wlan0')[0] + ad2_ip = ad2.droid.connectivityGetIPv4Addresses('wlan0')[0] + # Ping each other + asserts.assert_true( + utils.adb_shell_ping(ad1, count=10, dest_ip=ad2_ip, timeout=20), + "%s ping %s failed" % (ad1.serial, ad2_ip)) + asserts.assert_true( + utils.adb_shell_ping(ad2, count=10, dest_ip=ad1_ip, timeout=20), + "%s ping %s failed" % (ad2.serial, ad1_ip)) # Verify that both softap & wifi is enabled concurrently. self.verify_wifi_and_softap_enabled() - def start_softap_and_connect_to_wifi_network(self, nw_params, softap_band): - """Test concurrenct wifi connection and softap. - This helper method first starts SoftAp and then makes a wifi conenction. + def start_softap_and_connect_to_wifi_network(self, nw_params, softap_band, interface): + """Test concurrent wifi connection and softap. + This helper method first starts SoftAp and then makes a wifi connection. Args: nw_params: Params for network STA connection. softap_band: Band for the AP. + interface: the wlan interface of DUT - 1. Bring up softap and verify AP is seen on a client device. + 1. Bring up softap and verify AP can be connected by a client device. 2. Bring up wifi. 3. Establish connection to a network. - 4. Run iperf on the wifi connection to the network. + 4. Run iperf on the wifi/softap connection to the network. """ - self.start_softap_and_verify(softap_band) + softap_config = self.start_softap_and_verify(softap_band, check_connectivity=False) wutils.wifi_toggle_state(self.dut, True) self.connect_to_wifi_network_and_verify((nw_params, self.dut)) self.run_iperf_client((nw_params, self.dut)) + self.run_iperf_client((softap_config, self.dut_client)) + if len(self.android_devices) > 2: + self.log.info("Testbed has extra android devices, do more validation") + ad1 = self.dut + ad2 = self.android_devices[2] + ad1_ip = ad1.droid.connectivityGetIPv4Addresses(interface)[0] + ad2_ip = ad2.droid.connectivityGetIPv4Addresses('wlan0')[0] + # Ping each other + asserts.assert_true( + utils.adb_shell_ping(ad1, count=10, dest_ip=ad2_ip, timeout=20), + "%s ping %s failed" % (ad1.serial, ad2_ip)) + asserts.assert_true( + utils.adb_shell_ping(ad2, count=10, dest_ip=ad1_ip, timeout=20), + "%s ping %s failed" % (ad2.serial, ad1_ip)) # Verify that both softap & wifi is enabled concurrently. self.verify_wifi_and_softap_enabled() @@ -255,7 +306,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """Helper to verify both wifi and softap is enabled """ asserts.assert_true(self.dut.droid.wifiCheckState(), - "Wifi is not reported as running"); + "Wifi is not reported as running") asserts.assert_true(self.dut.droid.wifiIsApEnabled(), "SoftAp is not reported as running") @@ -324,7 +375,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_2g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_2G, 'wlan1') @test_tracker_info(uuid="5f954957-ad20-4de1-b20c-6c97d0463bdd") def test_softap_5G_wifi_connection_5G(self): @@ -332,7 +383,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_5G, 'wlan1') @test_tracker_info(uuid="1306aafc-a07e-4654-ba78-674f90cf748e") def test_softap_5G_wifi_connection_5G_DFS(self): @@ -340,7 +391,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_5G, 'wlan1') @test_tracker_info(uuid="5e28e8b5-3faa-4cff-a782-13a796d7f572") def test_softap_5G_wifi_connection_2G(self): @@ -348,7 +399,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G, 'wlan1') @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G(self): @@ -356,7 +407,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G, 'wlan1') @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G_DFS(self): @@ -364,7 +415,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G, 'wlan1') @test_tracker_info(uuid="aa23a3fc-31a1-4d5c-8cf5-2eb9fdf9e7ce") def test_softap_5G_wifi_connection_2G_with_location_scan_on(self): @@ -374,4 +425,4 @@ class WifiStaApConcurrencyTest(WifiBaseTest): self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.turn_location_on_and_scan_toggle_on() self.start_softap_and_connect_to_wifi_network( - self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G, 'wlan0') diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py index e99aed609f..d4d617fb82 100755 --- a/acts/tests/google/wifi/WifiStressTest.py +++ b/acts/tests/google/wifi/WifiStressTest.py @@ -25,6 +25,8 @@ from acts import asserts from acts import signals from acts import utils from acts.test_decorators import test_tracker_info +from acts.test_utils.bt.bt_test_utils import enable_bluetooth +from acts.test_utils.bt.bt_test_utils import disable_bluetooth from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest WifiEnums = wutils.WifiEnums @@ -53,7 +55,7 @@ class WifiStressTest(WifiBaseTest): req_params = [] opt_param = [ "open_network", "reference_networks", "iperf_server_address", - "stress_count", "stress_hours"] + "stress_count", "stress_hours", "attn_vals", "pno_interval"] self.unpack_userparams( req_param_names=req_params, opt_param_names=opt_param) @@ -78,6 +80,8 @@ class WifiStressTest(WifiBaseTest): self.dut.droid.wakeUpNow() def teardown_test(self): + if self.dut.droid.wifiIsApEnabled(): + wutils.stop_wifi_tethering(self.dut) self.dut.droid.wakeLockRelease() self.dut.droid.goToSleepNow() wutils.reset_wifi(self.dut) @@ -159,6 +163,90 @@ class WifiStressTest(WifiBaseTest): raise signals.TestFailure("Youtube video did not start. Current WiFi " "state is %d" % self.dut.droid.wifiCheckState()) + def add_networks(self, ad, networks): + """Add Wi-Fi networks to an Android device and verify the networks were + added correctly. + + Args: + ad: the AndroidDevice object to add networks to. + networks: a list of dicts, each dict represents a Wi-Fi network. + """ + for network in networks: + ret = ad.droid.wifiAddNetwork(network) + asserts.assert_true(ret != -1, "Failed to add network %s" % + network) + ad.droid.wifiEnableNetwork(ret, 0) + configured_networks = ad.droid.wifiGetConfiguredNetworks() + self.log.debug("Configured networks: %s", configured_networks) + + def connect_and_verify_connected_bssid(self, expected_bssid): + """Start a scan to get the DUT connected to an AP and verify the DUT + is connected to the correct BSSID. + + Args: + expected_bssid: Network bssid to which connection. + + Returns: + True if connection to given network happen, else return False. + """ + #force start a single scan so we don't have to wait for the + #WCM scheduled scan. + wutils.start_wifi_connection_scan(self.dut) + #wait for connection + time.sleep(20) + #verify connection + actual_network = self.dut.droid.wifiGetConnectionInfo() + self.log.info("Actual network: %s", actual_network) + try: + asserts.assert_equal(expected_bssid, + actual_network[WifiEnums.BSSID_KEY]) + except: + msg = "Device did not connect to any network." + raise signals.TestFailure(msg) + + def set_attns(self, attn_val_name): + """Sets attenuation values on attenuators used in this test. + + Args: + attn_val_name: Name of the attenuation value pair to use. + """ + self.log.info("Set attenuation values to %s", self.attn_vals[attn_val_name]) + try: + self.attenuators[0].set_atten(self.attn_vals[attn_val_name][0]) + self.attenuators[1].set_atten(self.attn_vals[attn_val_name][1]) + self.attenuators[2].set_atten(95) + self.attenuators[3].set_atten(95) + except: + self.log.error("Failed to set attenuation values %s.", attn_val_name) + raise + + def trigger_pno_and_assert_connect(self, attn_val_name, expected_con): + """Sets attenuators to disconnect current connection to trigger PNO. + Validate that the DUT connected to the new SSID as expected after PNO. + + Args: + attn_val_name: Name of the attenuation value pair to use. + expected_con: The expected info of the network to we expect the DUT + to roam to. + """ + connection_info = self.dut.droid.wifiGetConnectionInfo() + self.log.info("Triggering PNO connect from %s to %s", + connection_info[WifiEnums.SSID_KEY], + expected_con[WifiEnums.SSID_KEY]) + self.set_attns(attn_val_name) + self.log.info("Wait %ss for PNO to trigger.", self.pno_interval) + time.sleep(self.pno_interval) + try: + self.log.info("Connected to %s network after PNO interval" + % self.dut.droid.wifiGetConnectionInfo()) + expected_ssid = expected_con[WifiEnums.SSID_KEY] + verify_con = {WifiEnums.SSID_KEY: expected_ssid} + wutils.verify_wifi_connection_info(self.dut, verify_con) + self.log.info("Connected to %s successfully after PNO", + expected_ssid) + finally: + pass + """Tests""" @test_tracker_info(uuid="cd0016c6-58cf-4361-b551-821c0b8d2554") @@ -181,6 +269,28 @@ class WifiStressTest(WifiBaseTest): raise signals.TestPass(details="", extras={"Iterations":"%d" % self.stress_count, "Pass":"%d" %(count+1)}) + @test_tracker_info(uuid="4e591cec-9251-4d52-bc6e-6621507524dc") + def test_stress_toggle_wifi_state_bluetooth_on(self): + """Toggle WiFi state ON and OFF for N times when bluetooth ON.""" + enable_bluetooth(self.dut.droid, self.dut.ed) + for count in range(self.stress_count): + """Test toggling wifi""" + try: + self.log.debug("Going from on to off.") + wutils.wifi_toggle_state(self.dut, False) + self.log.debug("Going from off to on.") + startTime = time.time() + wutils.wifi_toggle_state(self.dut, True) + startup_time = time.time() - startTime + self.log.debug("WiFi was enabled on the device in %s s." % + startup_time) + except: + signals.TestFailure(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %count}) + disable_bluetooth(self.dut.droid) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + @test_tracker_info(uuid="49e3916a-9580-4bf7-a60d-a0f2545dcdde") def test_stress_connect_traffic_disconnect_5g(self): """Test to connect and disconnect from a network for N times. @@ -221,7 +331,8 @@ class WifiStressTest(WifiBaseTest): Steps: 1. Scan and connect to a network. 2. Run IPerf to download data for few hours. - 3. Verify no WiFi disconnects/data interruption. + 3. Run IPerf to upload data for few hours. + 4. Verify no WiFi disconnects/data interruption. """ try: @@ -238,6 +349,15 @@ class WifiStressTest(WifiBaseTest): self.log.debug("Error occurred in iPerf traffic.") start_time = time.time() self.run_ping(sec) + # Start IPerf traffic from phone to server. + # Upload data for 5 hours. + args = "-p {} -t {}".format(self.iperf_server.port, sec) + self.log.info("Running iperf client {}".format(args)) + result, data = self.dut.run_iperf_client(self.iperf_server_address, + args, timeout=sec+1) + if not result: + self.log.debug("Error occurred in iPerf traffic.") + self.run_ping(sec) except: total_time = time.time() - start_time raise signals.TestFailure("Network long-connect failed." @@ -301,6 +421,7 @@ class WifiStressTest(WifiBaseTest): ret = self.dut.droid.wifiAddNetwork(network) asserts.assert_true(ret != -1, "Add network %r failed" % network) self.dut.droid.wifiEnableNetwork(ret, 0) + self.dut.droid.wifiStartScan() time.sleep(WAIT_FOR_AUTO_CONNECT) cur_network = self.dut.droid.wifiGetConnectionInfo() cur_ssid = cur_network[WifiEnums.SSID_KEY] @@ -392,3 +513,48 @@ class WifiStressTest(WifiBaseTest): raise signals.TestPass(details="", extras={"Iterations":"%d" % self.stress_count, "Pass":"%d" %((count+1)*2)}) + @test_tracker_info(uuid="e8ae8cd2-c315-4c08-9eb3-83db65b78a58") + def test_stress_network_selector_2G_connection(self): + """ + 1. Add one saved 2G network to DUT. + 2. Move the DUT in range. + 3. Verify the DUT is connected to the network. + 4. Move the DUT out of range + 5. Repeat step 2-4 + """ + for attenuator in self.attenuators: + attenuator.set_atten(95) + + # add a saved network to DUT + networks = [self.reference_networks[0]['2g']] + self.add_networks(self.dut, networks) + + for count in range(self.stress_count): + # move the DUT in range + self.attenuators[0].set_atten(0) + # verify + self.connect_and_verify_connected_bssid(self.reference_networks[0]['2g']['bssid']) + # move the DUT out of range + self.attenuators[0].set_atten(95) + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) + + @test_tracker_info(uuid="5d5d14cb-3cd1-4b3d-8c04-0d6f4b764b6b") + def test_stress_pno_connection_to_2g(self): + """Test PNO triggered autoconnect to a network for N times + + Steps: + 1. Save 2Ghz valid network configuration in the device. + 2. Attenuate 5Ghz network and wait for a few seconds to trigger PNO. + 3. Check the device connected to 2Ghz network automatically. + 4. Repeat step 2-3 + """ + networks = [self.reference_networks[0]['2g']] + self.add_networks(self.dut, networks) + for count in range(self.stress_count): + self.trigger_pno_and_assert_connect("a_on_b_off", self.reference_networks[0]['2g']) + self.set_attns("b_on_a_off") + time.sleep(10) + wutils.set_attns(self.attenuators, "default") + raise signals.TestPass(details="", extras={"Iterations":"%d" % + self.stress_count, "Pass":"%d" %(count+1)}) diff --git a/acts/tests/google/wifi/WifiThroughputStabilityTest.py b/acts/tests/google/wifi/WifiThroughputStabilityTest.py index 326e8e807b..6d98fe9d94 100644 --- a/acts/tests/google/wifi/WifiThroughputStabilityTest.py +++ b/acts/tests/google/wifi/WifiThroughputStabilityTest.py @@ -23,7 +23,7 @@ from acts import asserts from acts import base_test from acts import utils from acts.controllers import iperf_server as ipf -from acts.test_decorators import test_tracker_info +from acts.metrics.loggers.blackbox import BlackboxMetricLogger from acts.test_utils.wifi import wifi_power_test_utils as wputils from acts.test_utils.wifi import wifi_retail_ap as retail_ap from acts.test_utils.wifi import wifi_test_utils as wutils @@ -34,72 +34,40 @@ MED_SLEEP = 6 class WifiThroughputStabilityTest(base_test.BaseTestClass): + """Class to test WiFi throughput stability. + + This class tests throughput stability and identifies cases where throughput + fluctuates over time. The class setups up the AP, configures and connects + the phone, and runs iperf throughput test at several attenuations For an + example config file to run this test class see + example_connectivity_performance_ap_sta.json. + """ + def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) - self.tests = ("test_tput_stability_high_TCP_DL_ch6_VHT20", - "test_tput_stability_high_TCP_UL_ch6_VHT20", - "test_tput_stability_low_TCP_DL_ch6_VHT20", - "test_tput_stability_low_TCP_UL_ch6_VHT20", - "test_tput_stability_high_UDP_DL_ch6_VHT20", - "test_tput_stability_high_UDP_UL_ch6_VHT20", - "test_tput_stability_low_UDP_DL_ch6_VHT20", - "test_tput_stability_low_UDP_UL_ch6_VHT20", - "test_tput_stability_high_TCP_DL_ch36_VHT20", - "test_tput_stability_high_TCP_UL_ch36_VHT20", - "test_tput_stability_low_TCP_DL_ch36_VHT20", - "test_tput_stability_low_TCP_UL_ch36_VHT20", - "test_tput_stability_high_UDP_DL_ch36_VHT20", - "test_tput_stability_high_UDP_UL_ch36_VHT20", - "test_tput_stability_low_UDP_DL_ch36_VHT20", - "test_tput_stability_low_UDP_UL_ch36_VHT20", - "test_tput_stability_high_TCP_DL_ch36_VHT40", - "test_tput_stability_high_TCP_UL_ch36_VHT40", - "test_tput_stability_low_TCP_DL_ch36_VHT40", - "test_tput_stability_low_TCP_UL_ch36_VHT40", - "test_tput_stability_high_UDP_DL_ch36_VHT40", - "test_tput_stability_high_UDP_UL_ch36_VHT40", - "test_tput_stability_low_UDP_DL_ch36_VHT40", - "test_tput_stability_low_UDP_UL_ch36_VHT40", - "test_tput_stability_high_TCP_DL_ch36_VHT80", - "test_tput_stability_high_TCP_UL_ch36_VHT80", - "test_tput_stability_low_TCP_DL_ch36_VHT80", - "test_tput_stability_low_TCP_UL_ch36_VHT80", - "test_tput_stability_high_UDP_DL_ch36_VHT80", - "test_tput_stability_high_UDP_UL_ch36_VHT80", - "test_tput_stability_low_UDP_DL_ch36_VHT80", - "test_tput_stability_low_UDP_UL_ch36_VHT80", - "test_tput_stability_high_TCP_DL_ch149_VHT20", - "test_tput_stability_high_TCP_UL_ch149_VHT20", - "test_tput_stability_low_TCP_DL_ch149_VHT20", - "test_tput_stability_low_TCP_UL_ch149_VHT20", - "test_tput_stability_high_UDP_DL_ch149_VHT20", - "test_tput_stability_high_UDP_UL_ch149_VHT20", - "test_tput_stability_low_UDP_DL_ch149_VHT20", - "test_tput_stability_low_UDP_UL_ch149_VHT20", - "test_tput_stability_high_TCP_DL_ch149_VHT40", - "test_tput_stability_high_TCP_UL_ch149_VHT40", - "test_tput_stability_low_TCP_DL_ch149_VHT40", - "test_tput_stability_low_TCP_UL_ch149_VHT40", - "test_tput_stability_high_UDP_DL_ch149_VHT40", - "test_tput_stability_high_UDP_UL_ch149_VHT40", - "test_tput_stability_low_UDP_DL_ch149_VHT40", - "test_tput_stability_low_UDP_UL_ch149_VHT40", - "test_tput_stability_high_TCP_DL_ch149_VHT80", - "test_tput_stability_high_TCP_UL_ch149_VHT80", - "test_tput_stability_low_TCP_DL_ch149_VHT80", - "test_tput_stability_low_TCP_UL_ch149_VHT80", - "test_tput_stability_high_UDP_DL_ch149_VHT80", - "test_tput_stability_high_UDP_UL_ch149_VHT80", - "test_tput_stability_low_UDP_DL_ch149_VHT80", - "test_tput_stability_low_UDP_UL_ch149_VHT80") + # Define metrics to be uploaded to BlackBox + self.min_throughput_metric = BlackboxMetricLogger.for_test_case( + metric_name='min_throughput') + self.avg_throughput_metric = BlackboxMetricLogger.for_test_case( + metric_name='avg_throughput') + self.std_dev_percent_metric = BlackboxMetricLogger.for_test_case( + metric_name='std_dev_percent') + + # Generate test cases + modes = [(6, "VHT20"), (36, "VHT20"), (36, "VHT40"), (36, "VHT80"), + (149, "VHT20"), (149, "VHT40"), (149, "VHT80")] + traffic_types = [("TCP", "DL"), ("TCP", "UL"), ("UDP", "DL"), ("UDP", + "UL")] + signal_levels = ["high", "low"] + self.generate_test_cases(modes, traffic_types, signal_levels) def setup_class(self): self.dut = self.android_devices[0] req_params = [ "throughput_stability_test_params", "testbed_params", - "main_network" + "main_network", "RetailAccessPoints" ] - opt_params = ["RetailAccessPoints", "golden_files_list"] + opt_params = ["golden_files_list"] self.unpack_userparams(req_params, opt_params) self.test_params = self.throughput_stability_test_params self.num_atten = self.attenuators[0].instrument.num_atten @@ -131,32 +99,34 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): meta data """ #TODO(@oelayach): Check throughput vs RvR golden file + avg_throughput = test_result_dict["iperf_results"]["avg_throughput"] + min_throughput = test_result_dict["iperf_results"]["min_throughput"] + std_dev_percent = ( + test_result_dict["iperf_results"]["std_deviation"] / + test_result_dict["iperf_results"]["avg_throughput"]) * 100 + # Set blackbox metrics + self.avg_throughput_metric.metric_value = avg_throughput + self.min_throughput_metric.metric_value = min_throughput + self.std_dev_percent_metric.metric_value = std_dev_percent + # Evaluate pass/fail min_throughput_check = ( - (test_result_dict["iperf_results"]["min_throughput"] / - test_result_dict["iperf_results"]["avg_throughput"]) * + (min_throughput / avg_throughput) * 100) > self.test_params["min_throughput_threshold"] - std_deviation_check = ( - (test_result_dict["iperf_results"]["std_deviation"] / - test_result_dict["iperf_results"]["avg_throughput"]) * - 100) < self.test_params["std_deviation_threshold"] + std_deviation_check = std_dev_percent < self.test_params["std_deviation_threshold"] if min_throughput_check and std_deviation_check: asserts.explicit_pass( - "Test Passed. Throughput at {}dB attenuation is unstable. " - "Average throughput is {} Mbps with a standard deviation of " - "{} Mbps and dips down to {} Mbps.".format( - self.atten_level, - test_result_dict["iperf_results"]["avg_throughput"], - test_result_dict["iperf_results"]["std_deviation"], - test_result_dict["iperf_results"]["min_throughput"])) + "Test Passed. Throughput at {0:.2f}dB attenuation is stable. " + "Mean throughput is {1:.2f} Mbps with a standard deviation of " + "{2:.2f}% and dips down to {3:.2f} Mbps.".format( + self.atten_level, avg_throughput, std_dev_percent, + min_throughput)) asserts.fail( - "Test Failed. Throughput at {}dB attenuation is unstable. " - "Average throughput is {} Mbps with a standard deviation of " - "{} Mbps and dips down to {} Mbps.".format( - self.atten_level, - test_result_dict["iperf_results"]["avg_throughput"], - test_result_dict["iperf_results"]["std_deviation"], - test_result_dict["iperf_results"]["min_throughput"])) + "Test Failed. Throughput at {0:.2f}dB attenuation is unstable. " + "Mean throughput is {1:.2f} Mbps with a standard deviation of " + "{2:.2f}% and dips down to {3:.2f} Mbps.".format( + self.atten_level, avg_throughput, std_dev_percent, + min_throughput)) def post_process_results(self, test_result): """Extracts results and saves plots and JSON formatted results. @@ -245,14 +215,13 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): self.access_point.ap_settings)) # Set attenuator to test level self.log.info("Setting attenuation to {} dB".format(self.atten_level)) - [ - self.attenuators[i].set_atten(self.atten_level) - for i in range(self.num_atten) - ] + for attenuator in self.attenuators: + attenuator.set_atten(self.atten_level) # Connect DUT to Network wutils.wifi_toggle_state(self.dut, True) wutils.reset_wifi(self.dut) self.main_network[band]["channel"] = channel + self.dut.droid.wifiSetCountryCode(self.test_params["country_code"]) wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5) time.sleep(MED_SLEEP) # Run test and log result @@ -274,7 +243,8 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): out_file.write("\n".join(client_output)) self.iperf_server.stop() # Set attenuator to 0 dB - [self.attenuators[i].set_atten(0) for i in range(self.num_atten)] + for attenuator in self.attenuators: + attenuator.set_atten(0) # Parse and log result if self.use_client_output: iperf_file = client_output_path @@ -354,227 +324,14 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): test_result_postprocessed = self.post_process_results(test_result) self.pass_fail_check(test_result_postprocessed) - #Test cases - @test_tracker_info(uuid='cf6e1c5a-a54e-4efc-a5db-90e02029fe10') - def test_tput_stability_high_TCP_DL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='86f8d2ed-a35e-49fd-983a-7af528e94426') - def test_tput_stability_high_TCP_UL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='71f615c0-9de8-4070-ba40-43b59278722e') - def test_tput_stability_low_TCP_DL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='ad4774d7-4b2c-49f5-8407-3aa72eb43537') - def test_tput_stability_low_TCP_UL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='586e1745-a97e-4b6c-af3a-00566aded442') - def test_tput_stability_high_UDP_DL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='66af3329-f40f-46fb-a156-69b9047711ec') - def test_tput_stability_high_UDP_UL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='bfebc69d-6636-4f95-b709-9d31040ab8cc') - def test_tput_stability_low_UDP_DL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='f7cc0211-a18e-4502-a713-e4f88b446776') - def test_tput_stability_low_UDP_UL_ch6_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='594f0359-600c-4b5f-9b96-90e5aa0f2ffb') - def test_tput_stability_high_TCP_DL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='fd0a2604-2b69-40c5-906f-c3da45c062e8') - def test_tput_stability_high_TCP_UL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='4ead8dc9-6beb-4ce0-8490-66cdd343d355') - def test_tput_stability_low_TCP_DL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='2657454c-5a12-4d50-859a-2adb56910920') - def test_tput_stability_low_TCP_UL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='4a446a64-a1ab-441a-bfec-5e3fd509c43b') - def test_tput_stability_high_UDP_DL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='e4f07efe-71cb-4891-898d-127bd74ca8a7') - def test_tput_stability_high_UDP_UL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='aa0270cb-d18b-4048-a3d8-e09eb01ac417') - def test_tput_stability_low_UDP_DL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='40b18d99-40b9-4592-a624-a63bee9d55f4') - def test_tput_stability_low_UDP_UL_ch36_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='ce9a9d88-6d39-4a19-a5d6-5296ed480afa') - def test_tput_stability_high_TCP_DL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='23a5f845-0871-481a-898d-e0d6aceed4d4') - def test_tput_stability_high_TCP_UL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='6e12ee9e-8f27-46e4-8645-9e7a2cdc354f') - def test_tput_stability_low_TCP_DL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='2325dd69-17b2-49ce-ac02-dfaa839e638e') - def test_tput_stability_low_TCP_UL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='28ae4bcb-2e50-4d74-9b77-4a5e199bc7a4') - def test_tput_stability_high_UDP_DL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='a8c3b5bf-ccb8-435b-8861-3b21ed5072fa') - def test_tput_stability_high_UDP_UL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='8a15662a-ed78-4a0d-8271-15927963a2c0') - def test_tput_stability_low_UDP_DL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='005a84de-689d-4b40-9912-66c137872312') - def test_tput_stability_low_UDP_UL_ch36_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='b1506bec-f3dd-4d93-87b1-3e9a7172a904') - def test_tput_stability_high_TCP_DL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='bdebd1df-3e7c-40fc-ad28-1b817b9cb228') - def test_tput_stability_high_TCP_UL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='08c7bb18-681f-4a93-90c8-bc0df7169211') - def test_tput_stability_low_TCP_DL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='2ddbb7b-496b-4fce-8697-f90409b6e441') - def test_tput_stability_low_TCP_UL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='db41ce61-1d8f-4fe8-a904-77f8dcc50ba3') - def test_tput_stability_high_UDP_DL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='cd537cb0-8936-41f4-a0fb-1c8b4f38fb62') - def test_tput_stability_high_UDP_UL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='4a0209df-42b2-4143-8321-4023355bd663') - def test_tput_stability_low_UDP_DL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='64304510-bd5f-4497-9b18-0d4b1a8eb026') - def test_tput_stability_low_UDP_UL_ch36_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='d121fc54-2f1f-4722-8adb-17cb89fba3e6') - def test_tput_stability_high_TCP_DL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='cdfca524-7387-4a08-a256-4696b76aa90f') - def test_tput_stability_high_TCP_UL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='99280d2d-0096-4862-b27c-85aa46dcfb79') - def test_tput_stability_low_TCP_DL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='c09a0f67-0383-4d6a-ac52-f4afd830971c') - def test_tput_stability_low_TCP_UL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='7d6dbd39-1dea-4770-a2e0-d9fb9f32904d') - def test_tput_stability_high_UDP_DL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='f7e0574e-29b7-429e-9f47-e5cecc0e2bec') - def test_tput_stability_high_UDP_UL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='31bd4b12-1be5-46b3-8f86-914e739b0082') - def test_tput_stability_low_UDP_DL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='dc88a5bb-646d-407c-aabe-f8d533f4fbc1') - def test_tput_stability_low_UDP_UL_ch149_VHT20(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='9a101716-7774-451c-b0a3-4ac993143841') - def test_tput_stability_high_TCP_DL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='3533c6ba-1510-4559-9f76-5aef8d996c71') - def test_tput_stability_high_TCP_UL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='e83840a0-805e-408a-87b6-4bfce5306b1d') - def test_tput_stability_low_TCP_DL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='c55edf2b-b0ce-469e-bd98-e407a9f14126') - def test_tput_stability_low_TCP_UL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='0704b56e-e986-4bb8-84dc-e7a1ef94ed91') - def test_tput_stability_high_UDP_DL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='71a979c0-a7d9-45bc-a57a-d71f5629b191') - def test_tput_stability_high_UDP_UL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='b4d0f554-f49b-46a8-897b-fc0d3af2e4b5') - def test_tput_stability_low_UDP_DL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='5bcb5950-db7f-435f-8fcb-55620c757f4f') - def test_tput_stability_low_UDP_UL_ch149_VHT40(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='d05bcae5-48ac-410b-8083-a05951839121') - def test_tput_stability_high_TCP_DL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='038f0c6d-527b-4094-af41-d8732b7594ed') - def test_tput_stability_high_TCP_UL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='7ebd1735-b495-4385-a1d4-e2a2991ecbcd') - def test_tput_stability_low_TCP_DL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='f9e2679a-0c2e-42ac-a7b2-81aae8680a8f') - def test_tput_stability_low_TCP_UL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='39e90233-404e-48ed-a71d-aa55b8047641') - def test_tput_stability_high_UDP_DL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='e68a8b64-9278-4c75-8f98-37b415287d9c') - def test_tput_stability_high_UDP_UL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='f710c4f8-5ae3-4e70-8d1f-dfa3fb7222a8') - def test_tput_stability_low_UDP_DL_ch149_VHT80(self): - self._test_throughput_stability() - - @test_tracker_info(uuid='ce8c125c-73d1-4aab-8d09-0b67d8598d20') - def test_tput_stability_low_UDP_UL_ch149_VHT80(self): - self._test_throughput_stability() + def generate_test_cases(self, modes, traffic_types, signal_levels): + """Function that auto-generates test cases for a test class.""" + testcase_wrapper = self._test_throughput_stability + for mode in modes: + for traffic_type in traffic_types: + for signal_level in signal_levels: + testcase_name = "test_tput_stability_{}_{}_{}_ch{}_{}".format( + signal_level, traffic_type[0], traffic_type[1], + mode[0], mode[1]) + setattr(self, testcase_name, testcase_wrapper) + self.tests.append(testcase_name) diff --git a/acts/tests/google/wifi/example_connectivity_performance_ap_sta.json b/acts/tests/google/wifi/example_connectivity_performance_ap_sta.json new file mode 100644 index 0000000000..234df4a52b --- /dev/null +++ b/acts/tests/google/wifi/example_connectivity_performance_ap_sta.json @@ -0,0 +1,89 @@ +{ + "testbed": [{ + "name": "<your testbed name>", + "AndroidDevice": ["<your device serial number>"], + "bug_report": 1, + "RetailAccessPoints": ["<your ap configuration. see class definition in wifi_retail_ap.py>"], + "Attenuator": ["<your attenuator configuration. see attenuator class definition>"], + "main_network": { + "<your network name>": { + "SSID": "<your SSID>", + "password": "<your key>", + "BSSID": "<your BSSID>" + }, + "<your other network names>": { + "SSID": "<your SSID>", + "password": "<your key>", + "BSSID": "<your BSSID>" + } + }, + "IPerfServer": ["<your iperf server configuation. see class definition in iperf_server>"], + "testbed_params": { + "default_region": "<default access point region to run tests in. This will be used for all non DFS channels>", + "DFS_region": "<access point region to run DFS tests in>", + "iperf_server_address": "<ip address of iperf server generating or accepting test traffic>", + "fixed_attenuation": {"<your channel number 1>": "<your testbed attenuation on this channel>", "<your channel number 2>": "<your testbed attenuation on this channel>"}, + "dut_front_end_loss": {"<your channel number 1>": "<your DUT front end loss on this channel>", "<your channel number 2>": "<your DUT front end loss on this channel>"}, + "ap_tx_power": {"<your channel number 1>": "<your access point transmit power on this channel>", "<your channel number 2>": "<your access point transmit power on this channel>"}, + "golden_results_path": "<your full path to golden results used for pass fail check>" + } + } + ], + "rvr_test_params":{ + "country_code": "<device country code to set during rvr tests>", + "iperf_duration": 30, + "iperf_ignored_interval": 2, + "UDP_rates": {"VHT20": "<throughput to transmit in this mode>", "VHT40": "<throughput to transmit in this mode>", "VHT80": "<throughput to transmit in this mode>"}, + "rvr_atten_start": 20, + "rvr_atten_stop": 30, + "rvr_atten_step": 5, + "pct_tolerance": 5, + "abs_tolerance": 5, + "failure_count_tolerance": 1 + }, + "rssi_test_params":{ + "country_code": "<device country code to set during rvr tests>", + "rssi_vs_atten_start": 20, + "rssi_vs_atten_stop": 80, + "rssi_vs_atten_step": 1, + "rssi_vs_atten_connected_measurements": 10, + "rssi_vs_atten_scan_measurements": 0, + "rssi_vs_atten_metrics": ["signal_poll_rssi", "scan_rssi", "chain_0_rssi", "chain_1_rssi"], + "rssi_stability_atten": [20, 55], + "rssi_stability_duration": 10, + "rssi_tracking_waveforms": [{"atten_levels": [40, 61, 40], "step_size": 1, "step_duration": 1, "repetitions":1}], + "polling_frequency": 0.25, + "abs_tolerance": 2.5, + "stdev_tolerance": 1 + }, + "throughput_stability_test_params":{ + "country_code": "<device country code to set during rvr tests>", + "iperf_duration": 30, + "iperf_ignored_interval": 5, + "UDP_rates": {"VHT20": "200M", "VHT40": "400M", "VHT80": "700M"}, + "low_rssi_backoff_from_range": 10, + "min_throughput_threshold": 75, + "std_deviation_threshold": 5 + + }, + "ping_test_params":{ + "country_code": "<device country code to set during rvr tests>", + "ping_size": 64, + "range_ping_duration": 1, + "range_ping_interval": 0.002, + "range_atten_start": 60, + "range_atten_step": 1, + "range_atten_stop": 70, + "range_ping_loss_threshold": 25, + "range_gap_threshold": 2, + "rtt_ping_duration": 30, + "rtt_ping_interval": {"fast": 0.002, "slow": 0.5}, + "rtt_ignored_interval": 0.15, + "rtt_test_attenuation": [20, 50], + "rtt_test_percentile": 5, + "rtt_threshold": 0.2, + "rtt_std_deviation_threshold": 5 + }, + "logpath": "<path to logs>", + "testpaths": ["<path to ACTS root folder>/tools/test/connectivity/acts/tests/google/wifi"] +} |