diff options
author | Bill Yi <byi@google.com> | 2018-11-30 11:50:51 -0800 |
---|---|---|
committer | android-build-merger <android-build-merger@google.com> | 2018-11-30 11:50:51 -0800 |
commit | e570fab62d8605e06dc26f7f1ea7fa685c096bfc (patch) | |
tree | 1ab6f1c060d261b55dd1b25d77bd9ea37d54df92 | |
parent | 241b580aec1347a287cf939fe63aa4f32af2ee8e (diff) | |
parent | d88d2b0d374d6d9777a3d335ade5311b1016944b (diff) | |
download | platform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.tar.gz platform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.tar.bz2 platform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.zip |
Merge pi-qpr1-release PQ1A.181105.017.A1 to pi-platform-release
am: d88d2b0d37
Change-Id: Icf7ddcbdb02a48749f3010a6e4d76792c24886c3
40 files changed, 3838 insertions, 450 deletions
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py index b861cdc865..313fcf999d 100755 --- a/acts/framework/acts/controllers/access_point.py +++ b/acts/framework/acts/controllers/access_point.py @@ -26,6 +26,7 @@ from acts.controllers.ap_lib import dhcp_config from acts.controllers.ap_lib import dhcp_server from acts.controllers.ap_lib import hostapd from acts.controllers.ap_lib import hostapd_config +from acts.controllers.ap_lib import hostapd_constants from acts.controllers.utils_lib.commands import ip from acts.controllers.utils_lib.commands import route from acts.controllers.utils_lib.commands import shell @@ -308,15 +309,19 @@ class AccessPoint(object): return interface - def get_bssid_from_ssid(self, ssid): + def get_bssid_from_ssid(self, ssid, band): """Gets the BSSID from a provided SSID Args: - ssid: An SSID string + ssid: An SSID string. + band: 2G or 5G Wifi band. Returns: The BSSID if on the AP or None if SSID could not be found. """ + if band == hostapd_constants.BAND_2G: + interfaces = [self.wlan_2g, ssid] + else: + interfaces = [self.wlan_5g, ssid] - interfaces = [self.wlan_2g, self.wlan_5g, ssid] # Get the interface name associated with the given ssid. for interface in interfaces: cmd = "iw dev %s info|grep ssid|awk -F' ' '{print $2}'" % ( diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py index 3b9cb0ffa7..03097f049e 100755 --- a/acts/framework/acts/controllers/android_device.py +++ b/acts/framework/acts/controllers/android_device.py @@ -388,6 +388,9 @@ class AndroidDevice: self.adb = adb.AdbProxy(serial, ssh_connection=ssh_connection) self.fastboot = fastboot.FastbootProxy( serial, ssh_connection=ssh_connection) + self.adb_logcat_file_path = os.path.join( + log_path_base, 'AndroidDevice%s' % serial, + "adblog,{},{}.txt".format(self.model, serial)) if not self.is_bootloader: self.root_adb() self._ssh_connection = ssh_connection @@ -642,8 +645,9 @@ class AndroidDevice: except (IndexError, ValueError) as e: # Possible ValueError from string to int cast. # Possible IndexError from split. - self.log.warn('Command \"%s\" returned output line: ' - '\"%s\".\nError: %s', cmd, out, e) + self.log.warn( + 'Command \"%s\" returned output line: ' + '\"%s\".\nError: %s', cmd, out, e) except Exception as e: self.log.warn( 'Device fails to check if %s running with \"%s\"\n' @@ -754,7 +758,7 @@ class AndroidDevice: begin_at = '-T "%s"' % self.last_logcat_timestamp else: begin_at = '-T 1' - # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object. + # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object. cmd = "adb -s {} logcat {} -v year {} >> {}".format( self.serial, begin_at, extra_params, logcat_file_path) self.adb_logcat_process = utils.start_standing_subprocess(cmd) @@ -810,8 +814,9 @@ class AndroidDevice: 'pm list packages | grep -w "package:%s"' % package_name)) except Exception as err: - self.log.error('Could not determine if %s is installed. ' - 'Received error:\n%s', package_name, err) + self.log.error( + 'Could not determine if %s is installed. ' + 'Received error:\n%s', package_name, err) return False def is_sl4a_installed(self): @@ -835,8 +840,9 @@ class AndroidDevice: self.log.info("apk %s is running", package_name) return True except Exception as e: - self.log.warn("Device fails to check is %s running by %s " - "Exception %s", package_name, cmd, e) + self.log.warn( + "Device fails to check is %s running by %s " + "Exception %s", package_name, cmd, e) continue self.log.debug("apk %s is not running", package_name) return False @@ -896,9 +902,9 @@ class AndroidDevice: if new_br: out = self.adb.shell("bugreportz", timeout=BUG_REPORT_TIMEOUT) if not out.startswith("OK"): - raise AndroidDeviceError("Failed to take bugreport on %s: %s" % - (self.serial, out)) - br_out_path = out.split(':')[1].strip() + raise AndroidDeviceError( + "Failed to take bugreport on %s: %s" % (self.serial, out)) + br_out_path = out.split(':')[1].strip().split()[0] self.adb.pull("%s %s" % (br_out_path, full_out_path)) else: self.adb.bugreport( @@ -1142,6 +1148,9 @@ class AndroidDevice: self.stop_services() self.log.info("Restarting android runtime") self.adb.shell("stop") + # Reset the boot completed flag before we restart the framework + # to correctly detect when the framework has fully come up. + self.adb.shell("setprop sys.boot_completed 0") self.adb.shell("start") self.wait_for_boot_completion() self.root_adb() @@ -1301,7 +1310,8 @@ class AndroidDevice: def is_screen_lock_enabled(self): """Check if screen lock is enabled""" - cmd = ("sqlite3 /data/system/locksettings.db .dump"" | grep lockscreen.password_type | grep -v alternate") + cmd = ("sqlite3 /data/system/locksettings.db .dump" + " | grep lockscreen.password_type | grep -v alternate") out = self.adb.shell(cmd, ignore_status=True) if "unable to open" in out: self.root_adb() @@ -1319,7 +1329,7 @@ class AndroidDevice: self.log.info("Device is in CrpytKeeper window") return True if "StatusBar" in current_window and ( - (not current_app) or "FallbackHome" in current_app): + (not current_app) or "FallbackHome" in current_app): self.log.info("Device is locked") return True return False @@ -1370,12 +1380,13 @@ class AndroidDevice: def exit_setup_wizard(self): if not self.is_user_setup_complete() or self.is_setupwizard_on(): - self.adb.shell("pm disable %s" % self.get_setupwizard_package_name()) + self.adb.shell( + "pm disable %s" % self.get_setupwizard_package_name()) # Wait up to 5 seconds for user_setup_complete to be updated - for _ in range(5): + end_time = time.time() + 5 + while time.time() < end_time: if self.is_user_setup_complete() or not self.is_setupwizard_on(): return - time.sleep(1) # If fail to exit setup wizard, set local.prop and reboot if not self.is_user_setup_complete() and self.is_setupwizard_on(): @@ -1386,12 +1397,18 @@ class AndroidDevice: def get_setupwizard_package_name(self): """Finds setupwizard package/.activity + Bypass setupwizard or setupwraith depending on device. + Returns: packageName/.ActivityName - """ - package = self.adb.shell("pm list packages -f | grep setupwizard | grep com.google.android") + """ + packages_to_skip = "'setupwizard|setupwraith'" + android_package_name = "com.google.android" + package = self.adb.shell( + "pm list packages -f | grep -E {} | grep {}".format( + packages_to_skip, android_package_name)) wizard_package = re.split("=", package)[1] - activity = re.search("wizard/(.*?).apk", package, re.IGNORECASE).groups()[0] + activity = re.search("(.*?).apk", package, re.IGNORECASE).groups()[0] self.log.info("%s/.%sActivity" % (wizard_package, activity)) return "%s/.%sActivity" % (wizard_package, activity) diff --git a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py index 01cb76af58..21d63c7810 100644 --- a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py @@ -24,8 +24,14 @@ import os from acts import utils from acts.base_test import BaseTestClass from acts.signals import TestSignal +from acts.utils import create_dir +from acts.utils import dump_string_to_file from acts.controllers import android_device +from acts.libs.proto.proto_utils import compile_import_proto +from acts.libs.proto.proto_utils import parse_proto_to_ascii +from acts.test_utils.bt.bt_metrics_utils import get_bluetooth_metrics +from acts.test_utils.bt.bt_test_utils import get_device_selector_dictionary from acts.test_utils.bt.bt_test_utils import reset_bluetooth from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs @@ -47,6 +53,36 @@ class BluetoothBaseTest(BaseTestClass): BaseTestClass.__init__(self, controllers) for ad in self.android_devices: self._setup_bt_libs(ad) + if 'preferred_device_order' in self.user_params: + prefered_device_order = self.user_params['preferred_device_order'] + for i, ad in enumerate(self.android_devices): + if ad.serial in prefered_device_order: + index = prefered_device_order.index(ad.serial) + self.android_devices[i], self.android_devices[index] = \ + self.android_devices[index], self.android_devices[i] + + def collect_bluetooth_manager_metrics_logs(self, ads, test_name): + """ + Collect Bluetooth metrics logs, save an ascii log to disk and return + both binary and ascii logs to caller + :param ads: list of active Android devices + :return: List of binary metrics logs, + List of ascii metrics logs + """ + bluetooth_logs = [] + bluetooth_logs_ascii = [] + for ad in ads: + serial = ad.serial + out_name = "{}_{}_{}".format(serial, test_name, + "bluetooth_metrics.txt") + bluetooth_log = get_bluetooth_metrics(ad, + ad.bluetooth_proto_module) + bluetooth_log_ascii = parse_proto_to_ascii(bluetooth_log) + dump_string_to_file(bluetooth_log_ascii, + os.path.join(ad.metrics_path, out_name)) + bluetooth_logs.append(bluetooth_log) + bluetooth_logs_ascii.append(bluetooth_log_ascii) + return bluetooth_logs, bluetooth_logs_ascii # Use for logging in the test cases to facilitate # faster log lookup and reduce ambiguity in logging. @@ -95,6 +131,8 @@ class BluetoothBaseTest(BaseTestClass): return _safe_wrap_test_case def setup_class(self): + self.device_selector = get_device_selector_dictionary( + self.android_devices) if "reboot_between_test_class" in self.user_params: threads = [] for a in self.android_devices: @@ -104,7 +142,42 @@ class BluetoothBaseTest(BaseTestClass): thread.start() for t in threads: t.join() - return setup_multiple_devices_for_bt_test(self.android_devices) + if not setup_multiple_devices_for_bt_test(self.android_devices): + return False + if "bluetooth_proto_path" in self.user_params: + from google import protobuf + + self.bluetooth_proto_path = self.user_params[ + "bluetooth_proto_path"][0] + if not os.path.isfile(self.bluetooth_proto_path): + try: + self.bluetooth_proto_path = "{}/bluetooth.proto".format( + os.path.dirname(os.path.realpath(__file__))) + except Exception: + self.log.error("File not found.") + if not os.path.isfile(self.bluetooth_proto_path): + self.log.error("Unable to find Bluetooth proto {}.".format( + self.bluetooth_proto_path)) + return False + for ad in self.android_devices: + ad.metrics_path = os.path.join(ad.log_path, "BluetoothMetrics") + create_dir(ad.metrics_path) + ad.bluetooth_proto_module = \ + compile_import_proto(ad.metrics_path, self.bluetooth_proto_path) + if not ad.bluetooth_proto_module: + self.log.error("Unable to compile bluetooth proto at " + + self.bluetooth_proto_path) + return False + # Clear metrics. + get_bluetooth_metrics(ad, ad.bluetooth_proto_module) + return True + + def teardown_class(self): + if "bluetooth_proto_path" in self.user_params: + # Collect metrics here bassed off class name + bluetooth_logs, bluetooth_logs_ascii = \ + self.collect_bluetooth_manager_metrics_logs( + self.android_devices, self.__class__.__name__) def setup_test(self): self.timer_list = [] @@ -114,9 +187,6 @@ class BluetoothBaseTest(BaseTestClass): a.droid.wakeUpNow() return True - def teardown_test(self): - return True - def on_fail(self, test_name, begin_time): self.log.debug( "Test {} failed. Gathering bugreport and btsnoop logs".format( @@ -175,3 +245,6 @@ class BluetoothBaseTest(BaseTestClass): # Shell command library setattr(android_device, "shell", ShellCommands(log=self.log, dut=android_device)) + # Setup Android Device feature list + setattr(android_device, "features", + android_device.adb.shell("pm list features").split("\n")) diff --git a/acts/framework/acts/test_utils/bt/bluetooth.proto b/acts/framework/acts/test_utils/bt/bluetooth.proto index a43ff47f55..4a07e59d76 100644 --- a/acts/framework/acts/test_utils/bt/bluetooth.proto +++ b/acts/framework/acts/test_utils/bt/bluetooth.proto @@ -114,6 +114,15 @@ message RFCommSession { optional int32 tx_bytes = 2; } +enum A2dpSourceCodec { + A2DP_SOURCE_CODEC_UNKNOWN = 0; + A2DP_SOURCE_CODEC_SBC = 1; + A2DP_SOURCE_CODEC_AAC = 2; + A2DP_SOURCE_CODEC_APTX = 3; + A2DP_SOURCE_CODEC_APTX_HD = 4; + A2DP_SOURCE_CODEC_LDAC = 5; +} + // Session information that gets logged for A2DP session. message A2DPSession { // Media timer in milliseconds. @@ -139,6 +148,12 @@ message A2DPSession { // Total audio time in this A2DP session optional int64 audio_duration_millis = 8; + + // Audio codec used in this A2DP session in A2DP source role + optional A2dpSourceCodec source_codec = 9; + + // Whether A2DP offload is enabled in this A2DP session + optional bool is_a2dp_offload = 10; } message PairEvent { diff --git a/acts/framework/acts/test_utils/bt/bt_constants.py b/acts/framework/acts/test_utils/bt/bt_constants.py index f093a1f68c..5be4c79a20 100644 --- a/acts/framework/acts/test_utils/bt/bt_constants.py +++ b/acts/framework/acts/test_utils/bt/bt_constants.py @@ -596,3 +596,101 @@ logcat_strings = { } ### End logcat strings dict""" + +### Begin Service Discovery UUIDS ### +### Values match the Bluetooth SIG defined values: """ +""" https://www.bluetooth.com/specifications/assigned-numbers/service-discovery """ +sig_uuid_constants = { + "BASE_UUID": "0000{}-0000-1000-8000-00805F9B34FB", + "SDP": "0001", + "UDP": "0002", + "RFCOMM": "0003", + "TCP": "0004", + "TCS-BIN": "0005", + "TCS-AT": "0006", + "ATT": "0007", + "OBEX": "0008", + "IP": "0009", + "FTP": "000A", + "HTTP": "000C", + "WSP": "000E", + "BNEP": "000F", + "UPNP": "0010", + "HIDP": "0011", + "HardcopyControlChannel": "0012", + "HardcopyDataChannel": "0014", + "HardcopyNotification": "0016", + "AVCTP": "0017", + "AVDTP": "0019", + "CMTP": "001B", + "MCAPControlChannel": "001E", + "MCAPDataChannel": "001F", + "L2CAP": "0100", + "ServiceDiscoveryServerServiceClassID": "1000", + "BrowseGroupDescriptorServiceClassID": "1001", + "SerialPort": "1101", + "LANAccessUsingPPP": "1102", + "DialupNetworking": "1103", + "IrMCSync": "1104", + "OBEXObjectPush": "1105", + "OBEXFileTransfer": "1106", + "IrMCSyncCommand": "1107", + "Headset": "1108", + "CordlessTelephony": "1109", + "AudioSource": "110A", + "AudioSink": "110B", + "A/V_RemoteControlTarget": "110C", + "AdvancedAudioDistribution": "110D", + "A/V_RemoteControl": "110E", + "A/V_RemoteControlController": "110F", + "Intercom": "1110", + "Fax": "1111", + "Headset - Audio Gateway (AG)": "1112", + "WAP": "1113", + "WAP_CLIENT": "1114", + "PANU": "1115", + "NAP": "1116", + "GN": "1117", + "DirectPrinting": "1118", + "ReferencePrinting": "1119", + "ImagingResponder": "111B", + "ImagingAutomaticArchive": "111C", + "ImagingReferencedObjects": "111D", + "Handsfree": "111E", + "HandsfreeAudioGateway": "111F", + "DirectPrintingReferenceObjectsService": "1120", + "ReflectedUI": "1121", + "BasicPrinting": "1122", + "PrintingStatus": "1123", + "HumanInterfaceDeviceService": "1124", + "HardcopyCableReplacement": "1125", + "HCR_Print": "1126", + "HCR_Scan": "1127", + "Common_ISDN_Access": "1128", + "SIM_Access": "112D", + "Phonebook Access - PCE": "112E", + "Phonebook Access - PSE": "112F", + "Phonebook Access": "1130", + "Headset - HS": "1131", + "Message Access Server": "1132", + "Message Notification Server": "1133", + "Message Access Profile": "1134", + "GNSS": "1135", + "GNSS_Server": "1136", + "PnPInformation": "1200", + "GenericNetworking": "1201", + "GenericFileTransfer": "1202", + "GenericAudio": "1203", + "GenericTelephony": "1204", + "UPNP_Service": "1205", + "UPNP_IP_Service": "1206", + "ESDP_UPNP_IP_PAN": "1300", + "ESDP_UPNP_IP_LAP": "1301", + "ESDP_UPNP_L2CAP": "1302", + "VideoSource": "1303", + "VideoSink": "1304", + "VideoDistribution": "1305", + "HDP": "1400" +} + +### End Service Discovery UUIDS ### diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py index 9a930ab898..58f1a3c393 100644 --- a/acts/framework/acts/test_utils/bt/bt_test_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py @@ -65,6 +65,7 @@ from acts.test_utils.bt.bt_constants import pan_connect_timeout from acts.test_utils.bt.bt_constants import small_timeout from acts.test_utils.bt.bt_constants import scan_result from acts.test_utils.bt.bt_constants import scan_failed +from acts.test_utils.bt.bt_constants import sig_uuid_constants from acts.test_utils.bt.bt_constants import hid_id_keyboard from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb @@ -223,7 +224,8 @@ def setup_multiple_devices_for_bt_test(android_devices): threads = [] try: for a in android_devices: - thread = threading.Thread(target=factory_reset_bluetooth, args=([[a]])) + thread = threading.Thread( + target=factory_reset_bluetooth, args=([[a]])) threads.append(thread) thread.start() for t in threads: @@ -282,7 +284,11 @@ def bluetooth_enabled_check(ad): return False return True -def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5): + +def wait_for_bluetooth_manager_state(droid, + state=None, + timeout=10, + threshold=5): """ Waits for BlueTooth normalized state or normalized explicit state args: droid: droid device object @@ -301,7 +307,8 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5) # for any normalized state if state is None: if len(set(all_states[-threshold:])) == 1: - log.info("State normalized {}".format(set(all_states[-threshold:]))) + log.info("State normalized {}".format( + set(all_states[-threshold:]))) return True else: # explicit check against normalized state @@ -310,9 +317,11 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5) time.sleep(0.5) log.error( "Bluetooth state fails to normalize" if state is None else - "Failed to match bluetooth state, current state {} expected state {}".format(get_state(), state)) + "Failed to match bluetooth state, current state {} expected state {}". + format(get_state(), state)) return False + def factory_reset_bluetooth(android_devices): """Clears Bluetooth stack of input Android device list. @@ -340,6 +349,7 @@ def factory_reset_bluetooth(android_devices): return False return True + def reset_bluetooth(android_devices): """Resets Bluetooth state of input Android device list. @@ -576,6 +586,7 @@ def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): mac_address = event['data']['Result']['deviceInfo']['address'] return mac_address, advertise_callback, scan_callback + def enable_bluetooth(droid, ed): if droid.bluetoothCheckState() is True: return True @@ -594,6 +605,7 @@ def enable_bluetooth(droid, ed): return True + def disable_bluetooth(droid): """Disable Bluetooth on input Droid object. @@ -1000,7 +1012,8 @@ def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): bluetooth_profile_connection_state_changed, bt_default_timeout) pri_ad.log.info("Got event {}".format(profile_event)) except Exception as e: - pri_ad.log.error("Did not disconnect from Profiles. Reason {}".format(e)) + pri_ad.log.error( + "Did not disconnect from Profiles. Reason {}".format(e)) return False profile = profile_event['data']['profile'] @@ -1387,3 +1400,78 @@ def is_a2dp_connected(sink, source): if (device["address"] == source.droid.bluetoothGetLocalAddress()): return True return False + + +def get_device_selector_dictionary(android_device_list): + """Create a dictionary of Bluetooth features vs Android devices. + + Args: + android_device_list: The list of Android devices. + Returns: + A dictionary of profiles/features to Android devices. + """ + selector_dict = {} + for ad in android_device_list: + uuids = ad.droid.bluetoothGetLocalUuids() + + for profile, uuid_const in sig_uuid_constants.items(): + uuid_check = sig_uuid_constants['BASE_UUID'].format( + uuid_const).lower() + if uuid_check in uuids: + if profile in selector_dict: + selector_dict[profile].append(ad) + else: + selector_dict[profile] = [ad] + + # Various services may not be active during BT startup. + # If the device can be identified through adb shell pm list features + # then try to add them to the appropriate profiles / features. + + # Android TV. + if "feature:com.google.android.tv.installed" in ad.features: + ad.log.info("Android TV device found.") + supported_profiles = ['AudioSink'] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + + # Android Auto + elif "feature:android.hardware.type.automotive" in ad.features: + ad.log.info("Android Auto device found.") + # Add: AudioSink , A/V_RemoteControl, + supported_profiles = [ + 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' + ] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + # Android Wear + elif "feature:android.hardware.type.watch" in ad.features: + ad.log.info("Android Wear device found.") + supported_profiles = [] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + # Android Phone + elif "feature:android.hardware.telephony" in ad.features: + ad.log.info("Android Phone device found.") + # Add: AudioSink + supported_profiles = [ + 'AudioSource', 'A/V_RemoteControlTarget', + 'Message Access Server' + ] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + return selector_dict + + +def _add_android_device_to_dictionary(android_device, profile_list, + selector_dict): + """Adds the AndroidDevice and supported features to the selector dictionary + + Args: + android_device: The Android device. + profile_list: The list of profiles the Android device supports. + """ + for profile in profile_list: + if profile in selector_dict and android_device not in selector_dict[profile]: + selector_dict[profile].append(android_device) + else: + selector_dict[profile] = [android_device] diff --git a/acts/framework/acts/test_utils/net/arduino_test_utils.py b/acts/framework/acts/test_utils/net/arduino_test_utils.py index af0b3da15f..058d1433a4 100644 --- a/acts/framework/acts/test_utils/net/arduino_test_utils.py +++ b/acts/framework/acts/test_utils/net/arduino_test_utils.py @@ -44,7 +44,7 @@ def connect_wifi(wd, network=None): """ wd.log.info("Flashing connect_wifi.ino onto dongle") cmd = "locate %s" % CONNECT_WIFI - file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").rstrip() + file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").split()[-1] write_status = wd.write(ARDUINO, file_path, network) asserts.assert_true(write_status, "Failed to flash connect wifi") wd.log.info("Flashing complete") diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py new file mode 100644 index 0000000000..02167c966c --- /dev/null +++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py @@ -0,0 +1,63 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts + +def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10): + """ Start NAT-T keep alive on dut """ + + ad.log.info("Starting NATT Keepalive") + status = None + + key = ad.droid.connectivityStartNattKeepalive( + interval, src_ip, src_port, dst_ip) + + ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Started") + try: + event = ad.ed.pop_event("PacketKeepaliveCallback") + status = event["data"]["packetKeepaliveEvent"] + except Empty: + msg = "Failed to receive confirmation of starting natt keepalive" + asserts.fail(msg) + finally: + ad.droid.connectivityNattKeepaliveStopListeningForEvent( + key, "Started") + + if status != "Started": + ad.log.error("Received keepalive status: %s" % status) + ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key) + return None + return key + +def stop_natt_keepalive(ad, key): + """ Stop NAT-T keep alive on dut """ + + ad.log.info("Stopping NATT keepalive") + status = False + ad.droid.connectivityStopNattKeepalive(key) + + ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Stopped") + try: + event = ad.ed.pop_event("PacketKeepaliveCallback") + status = event["data"]["packetKeepaliveEvent"] == "Stopped" + except Empty: + msg = "Failed to receive confirmation of stopping natt keepalive" + asserts.fail(msg) + finally: + ad.droid.connectivityNattKeepaliveStopListeningForEvent( + key, "Stopped") + + ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key) + return status diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py new file mode 100644 index 0000000000..867473caef --- /dev/null +++ b/acts/framework/acts/test_utils/net/net_test_utils.py @@ -0,0 +1,274 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts import utils +from acts.logger import epoch_to_log_line_timestamp +from acts.utils import get_current_epoch_time +from acts.logger import normalize_log_line_timestamp +from acts.utils import start_standing_subprocess +from acts.utils import stop_standing_subprocess +from acts.test_utils.net import connectivity_const as cconst +from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection +from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.wifi import wifi_test_utils as wutils + +import os +import re +import time +import urllib.request + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams +VPN_PING_ADDR = "10.10.10.1" +TCPDUMP_PATH = "/data/local/tmp/tcpdump" + +def verify_lte_data_and_tethering_supported(ad): + """Verify if LTE data is enabled and tethering supported""" + wutils.wifi_toggle_state(ad, False) + ad.droid.telephonyToggleDataConnection(True) + wait_for_cell_data_connection(ad.log, ad, True) + asserts.assert_true( + verify_http_connection(ad.log, ad), + "HTTP verification failed on cell data connection") + asserts.assert_true( + ad.droid.connectivityIsTetheringSupported(), + "Tethering is not supported for the provider") + wutils.wifi_toggle_state(ad, True) + +def set_chrome_browser_permissions(ad): + """Set chrome browser start with no-first-run verification. + Give permission to read from and write to storage + """ + commands = ["pm grant com.android.chrome " + "android.permission.READ_EXTERNAL_STORAGE", + "pm grant com.android.chrome " + "android.permission.WRITE_EXTERNAL_STORAGE", + "rm /data/local/chrome-command-line", + "am set-debug-app --persistent com.android.chrome", + 'echo "chrome --no-default-browser-check --no-first-run ' + '--disable-fre" > /data/local/tmp/chrome-command-line'] + for cmd in commands: + try: + ad.adb.shell(cmd) + except adb.AdbError: + self.log.warn("adb command %s failed on %s" % (cmd, ad.serial)) + +def verify_ping_to_vpn_ip(ad): + """ Verify if IP behind VPN server is pingable. + Ping should pass, if VPN is connected. + Ping should fail, if VPN is disconnected. + + Args: + ad: android device object + """ + ping_result = None + pkt_loss = "100% packet loss" + try: + ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % VPN_PING_ADDR) + except adb.AdbError: + pass + return ping_result and pkt_loss not in ping_result + +def legacy_vpn_connection_test_logic(ad, vpn_profile): + """ Test logic for each legacy VPN connection + + Steps: + 1. Generate profile for the VPN type + 2. Establish connection to the server + 3. Verify that connection is established using LegacyVpnInfo + 4. Verify the connection by pinging the IP behind VPN + 5. Stop the VPN connection + 6. Check the connection status + 7. Verify that ping to IP behind VPN fails + + Args: + 1. ad: Android device object + 2. VpnProfileType (1 of the 6 types supported by Android) + """ + # Wait for sometime so that VPN server flushes all interfaces and + # connections after graceful termination + time.sleep(10) + + ad.adb.shell("ip xfrm state flush") + ad.log.info("Connecting to: %s", vpn_profile) + ad.droid.vpnStartLegacyVpn(vpn_profile) + time.sleep(cconst.VPN_TIMEOUT) + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_equal(connected_vpn_info["state"], + cconst.VPN_STATE_CONNECTED, + "Unable to establish VPN connection for %s" + % vpn_profile) + + ping_result = verify_ping_to_vpn_ip(ad) + ip_xfrm_state = ad.adb.shell("ip xfrm state") + match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) + if match_obj: + ip_xfrm_state = format(match_obj.group(0)).split() + ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) + + ad.droid.vpnStopLegacyVpn() + asserts.assert_true(ping_result, + "Ping to the internal IP failed. " + "Expected to pass as VPN is connected") + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_true(not connected_vpn_info, + "Unable to terminate VPN connection for %s" + % vpn_profile) + +def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, + ipsec_server_type, log_path): + """ Download the certificates from VPN server and push to sdcard of DUT + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Client cert file name on DUT's sdcard + """ + url = "http://%s%s%s" % (vpn_server_addr, + vpn_params['cert_path_vpnserver'], + vpn_params['client_pkcs_file_name']) + local_cert_name = "%s_%s_%s" % (vpn_type.name, + ipsec_server_type, + vpn_params['client_pkcs_file_name']) + + local_file_path = os.path.join(log_path, local_cert_name) + try: + ret = urllib.request.urlopen(url) + with open(local_file_path, "wb") as f: + f.write(ret.read()) + except: + asserts.fail("Unable to download certificate from the server") + + f.close() + ad.adb.push("%s sdcard/" % local_file_path) + return local_cert_name + +def generate_legacy_vpn_profile(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path): + """ Generate legacy VPN profile for a VPN + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Vpn profile + """ + vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], + VPN_CONST.PWD: vpn_params['vpn_password'], + VPN_CONST.TYPE: vpn_type.value, + VPN_CONST.SERVER: vpn_server_addr,} + vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, + ipsec_server_type) + if vpn_type.name == "PPTP": + vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name + + psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) + rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) + + if vpn_type.name in psk_set: + vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] + elif vpn_type.name in rsa_set: + cert_name = download_load_certs(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path) + vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] + vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] + ad.droid.installCertificate(vpn_profile, cert_name, + vpn_params['cert_password']) + else: + vpn_profile[VPN_CONST.MPPE] = "mppe" + + return vpn_profile + +def start_tcpdump(ad, test_name): + """Start tcpdump on all interfaces + + Args: + ad: android device object. + test_name: tcpdump file name will have this + """ + ad.log.info("Starting tcpdump on all interfaces") + try: + ad.adb.shell("killall -9 tcpdump") + except AdbError: + ad.log.warn("Killing existing tcpdump processes failed") + out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH) + if "No such file" in out or not out: + ad.adb.shell("mkdir %s" % TCPDUMP_PATH) + else: + ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) + + begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) + begin_time = normalize_log_line_timestamp(begin_time) + + file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) + ad.log.info("tcpdump file is %s", file_name) + try: + cmd = "adb -s {} shell tcpdump -i any -s0 -w {}" . \ + format(ad.serial, file_name) + return start_standing_subprocess(cmd, 5) + except e: + ad.log.error(e) + + return None + +def stop_tcpdump(ad, proc, test_name): + """Stops tcpdump on any iface + Pulls the tcpdump file in the tcpdump dir + + Args: + ad: android device object. + proc: need to know which pid to stop + test_name: test name to save the tcpdump file + + Returns: + log_path of the tcpdump file + """ + ad.log.info("Stopping and pulling tcpdump if any") + if proc == None: + return None + try: + stop_standing_subprocess(proc) + except Exception as e: + ad.log.warning(e) + log_path = os.path.join(ad.log_path, test_name) + utils.create_dir(log_path) + ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path)) + ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) + file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) + return "%s/%s" % (log_path, file_name) diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py index 0a45dd1954..78c5090599 100644 --- a/acts/framework/acts/test_utils/power/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py @@ -125,7 +125,7 @@ class PowerBaseTest(base_test.BaseTestClass): if hasattr(self, 'attenuators'): self.num_atten = self.attenuators[0].instrument.num_atten self.atten_level = self.unpack_custom_file(self.attenuation_file) - self.set_attenuation(INITIAL_ATTEN) + self.set_attenuation(INITIAL_ATTEN) self.threshold = self.unpack_custom_file(self.threshold_file) self.mon_info = self.create_monsoon_info() @@ -451,11 +451,11 @@ class PowerBaseTest(base_test.BaseTestClass): self.brconfigs: dict for bridge interface configs """ wutils.wifi_toggle_state(self.dut, True) - self.brconfigs = wputils.ap_setup( - self.access_point, network, bandwidth=bandwidth) + if hasattr(self, 'access_points'): + self.brconfigs = wputils.ap_setup( + self.access_point, network, bandwidth=bandwidth) if connect: wutils.wifi_connect(self.dut, network) - return self.brconfigs def process_iperf_results(self): """Get the iperf results and process. diff --git a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py index 635c8b54c8..19702f4540 100644 --- a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py @@ -34,6 +34,7 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest): self.access_point_main = self.access_points[0] if len(self.access_points) > 1: self.access_point_aux = self.access_points[1] + if hasattr(self, 'network_file'): self.networks = self.unpack_custom_file(self.network_file, False) self.main_network = self.networks['main_network'] self.aux_network = self.networks['aux_network'] diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py index 3c5c92c3cc..4b6dc8c50e 100755 --- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py +++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py @@ -33,6 +33,9 @@ from acts.controllers.ap_lib import hostapd_bss_settings from acts.controllers.ap_lib import hostapd_constants from acts.controllers.ap_lib import hostapd_security +AP_1 = 0 +AP_2 = 1 +MAX_AP_COUNT = 2 class WifiBaseTest(BaseTestClass): def __init__(self, controllers): @@ -43,8 +46,10 @@ class WifiBaseTest(BaseTestClass): def get_wpa2_network( self, + mirror_ap, + reference_networks, hidden=False, - ap_count=1, + same_ssid=False, ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, @@ -53,13 +58,16 @@ class WifiBaseTest(BaseTestClass): generator. Args: - ap_count: Determines if we want to use one or both the APs - for configuration. If set to '2', then both APs will - be configured with the same configuration. - ssid_length_2g: Int, number of characters to use for 2G SSID. + mirror_ap: Boolean, determines if both APs use the same hostapd + config or different configs. + reference_networks: List of PSK networks. + same_ssid: Boolean, determines if both bands on AP use the same + SSID. + ssid_length_2gecond AP Int, number of characters to use for 2G SSID. ssid_length_5g: Int, number of characters to use for 5G SSID. passphrase_length_2g: Int, length of password for 2G network. passphrase_length_5g: Int, length of password for 5G network. + Returns: A dict of 2G and 5G network lists for hostapd configuration. """ @@ -67,13 +75,20 @@ class WifiBaseTest(BaseTestClass): network_dict_5g = {} ref_5g_security = hostapd_constants.WPA2_STRING ref_2g_security = hostapd_constants.WPA2_STRING - self.user_params["reference_networks"] = [] - ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) - ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + if same_ssid: + ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_5g_ssid = ref_2g_ssid + + ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + ref_5g_passphrase = ref_2g_passphrase - ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) - ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) + else: + ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + + ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) if hidden: network_dict_2g = { @@ -103,37 +118,47 @@ class WifiBaseTest(BaseTestClass): } ap = 0 - for ap in range(ap_count): - self.user_params["reference_networks"].append({ - "2g": - copy.copy(network_dict_2g), - "5g": - copy.copy(network_dict_5g) + for ap in range(MAX_AP_COUNT): + reference_networks.append({ + "2g": copy.copy(network_dict_2g), + "5g": copy.copy(network_dict_5g) }) - self.reference_networks = self.user_params["reference_networks"] + if not mirror_ap: + break return {"2g": network_dict_2g, "5g": network_dict_5g} def get_open_network(self, + mirror_ap, + open_network, hidden=False, - ap_count=1, + same_ssid=False, ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G): """Generates SSIDs for a open network using a random generator. Args: - ap_count: Determines if we want to use one or both the APs for - for configuration. If set to '2', then both APs will - be configured with the same configuration. + mirror_ap: Boolean, determines if both APs use the same hostapd + config or different configs. + open_network: List of open networks. + same_ssid: Boolean, determines if both bands on AP use the same + SSID. ssid_length_2g: Int, number of characters to use for 2G SSID. ssid_length_5g: Int, number of characters to use for 5G SSID. + Returns: A dict of 2G and 5G network lists for hostapd configuration. """ network_dict_2g = {} network_dict_5g = {} - self.user_params["open_network"] = [] - open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) - open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + + if same_ssid: + open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) + open_5g_ssid = open_2g_ssid + + else: + open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) + open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + if hidden: network_dict_2g = { "SSID": open_2g_ssid, @@ -158,14 +183,35 @@ class WifiBaseTest(BaseTestClass): } ap = 0 - for ap in range(ap_count): - self.user_params["open_network"].append({ - "2g": network_dict_2g, - "5g": network_dict_5g + for ap in range(MAX_AP_COUNT): + open_network.append({ + "2g": copy.copy(network_dict_2g), + "5g": copy.copy(network_dict_5g) }) - self.open_network = self.user_params["open_network"] + if not mirror_ap: + break return {"2g": network_dict_2g, "5g": network_dict_5g} + def update_bssid(self, ap_instance, ap, network, band): + """Get bssid and update network dictionary. + + Args: + ap_instance: Accesspoint index that was configured. + ap: Accesspoint object corresponding to ap_instance. + network: Network dictionary. + band: Wifi networks' band. + + """ + bssid = ap.get_bssid_from_ssid(network["SSID"], band) + + if network["security"] == hostapd_constants.WPA2_STRING: + # TODO:(bamahadev) Change all occurances of reference_networks + # to wpa_networks. + self.reference_networks[ap_instance][band]["bssid"] = bssid + + if network["security"] == 'none': + self.open_network[ap_instance][band]["bssid"] = bssid + def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): """Get bssid for a given SSID and add it to the network dictionary. @@ -180,20 +226,17 @@ class WifiBaseTest(BaseTestClass): if not (networks_5g or networks_2g): return - for network in itertools.chain(networks_5g, networks_2g): + for network in networks_5g: + if 'channel' in network: + continue + self.update_bssid(ap_instance, ap, network, + hostapd_constants.BAND_5G) + + for network in networks_2g: if 'channel' in network: continue - bssid = ap.get_bssid_from_ssid(network["SSID"]) - if '2g' in network["SSID"]: - band = hostapd_constants.BAND_2G - else: - band = hostapd_constants.BAND_5G - if network["security"] == hostapd_constants.WPA2_STRING: - # TODO:(bamahadev) Change all occurances of reference_networks - # to wpa_networks. - self.reference_networks[ap_instance][band]["bssid"] = bssid - else: - self.open_network[ap_instance][band]["bssid"] = bssid + self.update_bssid(ap_instance, ap, network, + hostapd_constants.BAND_2G) def legacy_configure_ap_and_start( self, @@ -206,47 +249,86 @@ class WifiBaseTest(BaseTestClass): ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, hidden=False, + same_ssid=False, + mirror_ap=True, ap_count=1): asserts.assert_true( len(self.user_params["AccessPoint"]) == 2, "Exactly two access points must be specified. \ Each access point has 2 radios, one each for 2.4GHZ \ and 5GHz. A test can choose to use one or both APs.") - network_list_2g = [] - network_list_5g = [] - network_list_2g.append({"channel": channel_2g}) - network_list_5g.append({"channel": channel_5g}) - if "reference_networks" in self.user_params: - pass - else: - networks_dict = self.get_wpa2_network(hidden=hidden, - ap_count=ap_count) - network_list_2g.append(networks_dict["2g"]) - network_list_5g.append(networks_dict["5g"]) + config_count = 1 + count = 0 + + # For example, the NetworkSelector tests use 2 APs and require that + # both APs are not mirrored. + if not mirror_ap and ap_count == 1: + raise ValueError("ap_count cannot be 1 if mirror_ap is False.") + + if not mirror_ap: + config_count = ap_count + + self.user_params["reference_networks"] = [] + self.user_params["open_network"] = [] + + for count in range(config_count): + + network_list_2g = [] + network_list_5g = [] + + orig_network_list_2g = [] + orig_network_list_5g = [] + + network_list_2g.append({"channel": channel_2g}) + network_list_5g.append({"channel": channel_5g}) + + networks_dict = self.get_wpa2_network( + mirror_ap, + self.user_params["reference_networks"], + hidden=hidden, + same_ssid=same_ssid) + self.reference_networks = self.user_params["reference_networks"] - if "open_network" in self.user_params: - pass - else: - networks_dict = self.get_open_network(hidden=hidden, - ap_count=ap_count) network_list_2g.append(networks_dict["2g"]) network_list_5g.append(networks_dict["5g"]) - orig_network_list_5g = copy.copy(network_list_5g) - orig_network_list_2g = copy.copy(network_list_2g) + # When same_ssid is set, only configure one set of WPA networks. + # We cannot have more than one set because duplicate interface names + # are not allowed. + # TODO(bmahadev): Provide option to select the type of network, + # instead of defaulting to WPA. + if not same_ssid: + networks_dict = self.get_open_network( + mirror_ap, + self.user_params["open_network"], + hidden=hidden, + same_ssid=same_ssid) + self.open_network = self.user_params["open_network"] - if len(network_list_5g) > 1: - self.config_5g = self._generate_legacy_ap_config(network_list_5g) - if len(network_list_2g) > 1: - self.config_2g = self._generate_legacy_ap_config(network_list_2g) - ap = 0 - for ap in range(ap_count): - self.access_points[ap].start_ap(self.config_2g) - self.access_points[ap].start_ap(self.config_5g) - self.populate_bssid(ap, self.access_points[ap], orig_network_list_5g, + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + + orig_network_list_5g = copy.copy(network_list_5g) + orig_network_list_2g = copy.copy(network_list_2g) + + if len(network_list_5g) > 1: + self.config_5g = self._generate_legacy_ap_config(network_list_5g) + if len(network_list_2g) > 1: + self.config_2g = self._generate_legacy_ap_config(network_list_2g) + + self.access_points[count].start_ap(self.config_2g) + self.access_points[count].start_ap(self.config_5g) + self.populate_bssid(count, self.access_points[count], orig_network_list_5g, orig_network_list_2g) + # Repeat configuration on the second router. + if mirror_ap and ap_count == 2: + self.access_points[AP_2].start_ap(self.config_2g) + self.access_points[AP_2].start_ap(self.config_5g) + self.populate_bssid(AP_2, self.access_points[AP_2], + orig_network_list_5g, orig_network_list_2g) + def _generate_legacy_ap_config(self, network_list): bss_settings = [] ap_settings = network_list.pop(0) diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py index 47aa6f2cb5..0c96fed835 100755 --- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py @@ -907,6 +907,32 @@ def start_wifi_tethering(ad, ssid, password, band=None, hidden=None): finally: ad.droid.wifiStopTrackingTetherStateChange() +def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None): + """ Save a soft ap configuration """ + if band: + wifi_config[WifiEnums.APBAND_KEY] = band + if hidden: + wifi_config[WifiEnums.HIDDEN_KEY] = hidden + asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config), + "Failed to set WifiAp Configuration") + + wifi_ap = ad.droid.wifiGetApConfiguration() + asserts.assert_true( + wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY], + "Hotspot SSID doesn't match with expected SSID") + +def start_wifi_tethering_saved_config(ad): + """ Turn on wifi hotspot with a config that is already saved """ + ad.droid.wifiStartTrackingTetherStateChange() + ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False) + try: + ad.ed.pop_event("ConnectivityManagerOnTetheringStarted") + ad.ed.wait_for_event("TetherStateChanged", + lambda x: x["data"]["ACTIVE_TETHER"], 30) + except: + asserts.fail("Didn't receive wifi tethering starting confirmation") + finally: + ad.droid.wifiStopTrackingTetherStateChange() def stop_wifi_tethering(ad): """Stops wifi tethering on an android_device. diff --git a/acts/framework/tests/acts_android_device_test.py b/acts/framework/tests/acts_android_device_test.py index b2cb0b5fae..6c664f5e10 100755 --- a/acts/framework/tests/acts_android_device_test.py +++ b/acts/framework/tests/acts_android_device_test.py @@ -28,9 +28,10 @@ from acts.controllers import android_device MOCK_LOG_PATH = "/tmp/logs/MockTest/xx-xx-xx_xx-xx-xx/" # Mock start and end time of the adb cat. -MOCK_ADB_LOGCAT_BEGIN_TIME = "1970-01-02 21:03:20.123" -MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_ADB_EPOCH_BEGIN_TIME = 191000123 +MOCK_ADB_LOGCAT_BEGIN_TIME = logger.normalize_log_line_timestamp( + logger.epoch_to_log_line_timestamp(MOCK_ADB_EPOCH_BEGIN_TIME)) +MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_SERIAL = 1 MOCK_RELEASE_BUILD_ID = "ABC1.123456.007" diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py index 1972806c80..83b5ddc60e 100644 --- a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py +++ b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py @@ -264,13 +264,13 @@ class ConcurrentBleScanningTest(BluetoothBaseTest): try: self.scn_ad.ed.pop_event( scan_failed.format(scan_callback), self.default_timeout) - self.log.info( - "Found scan event successfully. Iteration {} successful." + self.log.error( + "Unexpected scan event found. Iteration {} successful." .format(i)) + test_result = False except Exception: - self.log.info("Failed to find a onScanFailed event for callback {}" + self.log.info("No onScanFailed event for callback {} as expected." .format(scan_callback)) - test_result = False for callback in scan_callback_list: self.scn_ad.droid.bleStopBleScan(callback) return test_result diff --git a/acts/tests/google/ble/filtering/UniqueFilteringTest.py b/acts/tests/google/ble/filtering/UniqueFilteringTest.py index bb7b00f516..e4ad858197 100644 --- a/acts/tests/google/ble/filtering/UniqueFilteringTest.py +++ b/acts/tests/google/ble/filtering/UniqueFilteringTest.py @@ -58,8 +58,8 @@ class UniqueFilteringTest(BluetoothBaseTest): callbacktype = event['data']['CallbackType'] if callbacktype != expected_callbacktype: self.log.debug( - "Expected callback type: {}, Found callback type: {}" - .format(expected_callbacktype, callbacktype)) + "Expected callback type: {}, Found callback type: {}".format( + expected_callbacktype, callbacktype)) test_result = False return test_result @@ -109,6 +109,7 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ test_result = True + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects( @@ -118,6 +119,10 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) + self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) self.scn_ad.droid.bleFlushPendingScanResults(scan_callback) @@ -128,10 +133,10 @@ class UniqueFilteringTest(BluetoothBaseTest): self.log.debug(worker.result(self.default_timeout)) except Empty as error: test_result = False - self.log.debug("Test failed with Empty error: {}".format(error)) + self.log.error("Test failed with Empty error: {}".format(error)) except concurrent.futures._base.TimeoutError as error: test_result = False - self.log.debug("Test failed with TimeoutError: {}".format(error)) + self.log.error("Test failed with TimeoutError: {}".format(error)) self.scn_ad.droid.bleStopBleScan(scan_callback) self.adv_ad.droid.bleStopBleAdvertising(advertise_callback) return test_result @@ -173,6 +178,9 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) system_time_nanos = self.scn_ad.droid.getSystemElapsedRealtimeNanos() @@ -220,10 +228,14 @@ class UniqueFilteringTest(BluetoothBaseTest): filter_list, scan_settings, scan_callback = generate_ble_scan_objects( self.scn_ad.droid) expected_event_name = batch_scan_result.format(scan_callback) + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) advertise_callback, advertise_data, advertise_settings = ( generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) worker = self.scn_ad.ed.handle_event( @@ -232,8 +244,8 @@ class UniqueFilteringTest(BluetoothBaseTest): self.scn_ad.droid.bleFlushPendingScanResults(scan_callback) try: event_info = self.scn_ad.ed.pop_event(expected_event_name, 10) - self.log.debug("Unexpectedly found an advertiser: {}".format( - event_info)) + self.log.debug( + "Unexpectedly found an advertiser: {}".format(event_info)) test_result = False except Empty: self.log.debug("No {} events were found as expected.".format( @@ -285,8 +297,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event_info = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - self.log.error("Unexpectedly found an advertiser: {}".format( - event_info)) + self.log.error( + "Unexpectedly found an advertiser: {}".format(event_info)) test_result = False except Empty: self.log.debug("No events were found as expected.") @@ -337,6 +349,9 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) worker = self.scn_ad.ed.handle_event( @@ -345,8 +360,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event_info = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - self.log.error("Unexpectedly found an advertiser:".format( - event_info)) + self.log.error( + "Unexpectedly found an advertiser:".format(event_info)) test_result = False except Empty as error: self.log.debug("No events were found as expected.") @@ -399,12 +414,12 @@ class UniqueFilteringTest(BluetoothBaseTest): advertise_callback1, advertise_data1, advertise_settings1) filter_list = self.scn_ad.droid.bleGenFilterList() - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_callback = self.scn_ad.droid.bleGenScanCallback() - self.scn_ad.droid.bleSetScanFilterManufacturerData(117, [1, 2, 3], - [127, 127, 127]) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + 117, [1, 2, 3], [127, 127, 127]) self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) @@ -439,27 +454,35 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ test_result = True - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects( self.scn_ad.droid) expected_event_name = scan_result.format(scan_callback) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) advertise_callback, advertise_data, advertise_settings = ( generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) - event_info = self.scn_ad.ed.pop_event(expected_event_name, - self.default_timeout) + try: + event_info = self.scn_ad.ed.pop_event(expected_event_name, + self.default_timeout) + except Empty as error: + self.log.error("Could not find initial advertisement.") + return False mac_address = event_info['data']['Result']['deviceInfo']['address'] - self.log.info("Filter advertisement with address {}".format( - mac_address)) + self.log.info( + "Filter advertisement with address {}".format(mac_address)) self.scn_ad.droid.bleStopBleScan(scan_callback) - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) self.scn_ad.droid.bleSetScanFilterDeviceAddress(mac_address) filter_list2, scan_settings2, scan_callback2 = ( generate_ble_scan_objects(self.scn_ad.droid)) @@ -506,8 +529,8 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ manufacturer_id = 0x4c - self.adv_ad.droid.bleAddAdvertiseDataManufacturerId(manufacturer_id, - [0x01]) + self.adv_ad.droid.bleAddAdvertiseDataManufacturerId( + manufacturer_id, [0x01]) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) advertise_callback, advertise_data, advertise_settings = ( @@ -521,10 +544,10 @@ class UniqueFilteringTest(BluetoothBaseTest): self.log.info("Failed to start advertisement.") return False - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) - self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id, - [0x01]) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + manufacturer_id, [0x01]) filter_list = self.scn_ad.droid.bleGenFilterList() scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list) @@ -540,8 +563,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - found_manufacturer_id = json.loads(event['data']['Result'][ - 'manufacturerIdList']) + found_manufacturer_id = json.loads( + event['data']['Result']['manufacturerIdList']) if found_manufacturer_id[0] != manufacturer_id: self.log.error( "Manufacturer id mismatch. Found {}, Expected {}". @@ -601,8 +624,8 @@ class UniqueFilteringTest(BluetoothBaseTest): self.scn_ad.droid.bleSetScanSettingsScanMode( ble_scan_settings_modes['low_latency']) - self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id, - [0x01]) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + manufacturer_id, [0x01]) filter_list = self.scn_ad.droid.bleGenFilterList() scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list) @@ -617,8 +640,8 @@ class UniqueFilteringTest(BluetoothBaseTest): except Empty: self.log.error("Unable to find beacon advertisement.") return False - found_manufacturer_id = json.loads(event['data']['Result'][ - 'manufacturerIdList']) + found_manufacturer_id = json.loads( + event['data']['Result']['manufacturerIdList']) if found_manufacturer_id[0] != manufacturer_id: self.log.error( "Manufacturer id mismatch. Found {}, Expected {}".format( diff --git a/acts/tests/google/ble/gatt/GattConnectTest.py b/acts/tests/google/ble/gatt/GattConnectTest.py index 0b160a23cf..dd6857c941 100644 --- a/acts/tests/google/ble/gatt/GattConnectTest.py +++ b/acts/tests/google/ble/gatt/GattConnectTest.py @@ -308,7 +308,7 @@ class GattConnectTest(BluetoothBaseTest): self.default_timeout) except Empty: self.log.error( - gatt_cb_err['gatt_conn_change_err'].format(expected_event)) + gatt_cb_err['gatt_conn_changed_err'].format(expected_event)) test_result = False return self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) diff --git a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py index e907cbb342..9803648000 100644 --- a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py +++ b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py @@ -524,9 +524,9 @@ class BleOpportunisticScanTest(BluetoothBaseTest): self): """Test opportunistic scan result from secondary scan filter. - Tests opportunistic scan where the secondary scan instance does not find - an advertisement but the scan instance with scan mode set to - opportunistic scan will find an advertisement. + Tests opportunistic scan where the filtered scan instance does not find + an advertisement and the scan instance with scan mode set to + opportunistic scan will also not find an advertisement. Steps: 1. Initialize advertiser and start advertisement on dut1 (make sure the @@ -539,6 +539,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest): 6. Pop onScanResults from the second scanner 7. Expect no events 8. Pop onScanResults from the first scanner + 9. Expect no events Expected Result: Opportunistic scan instance finds an advertisement. @@ -573,11 +574,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest): if not self._verify_no_events_found( scan_result.format(scan_callback2)): return False - try: - self.scn_ad.ed.pop_event( - scan_result.format(scan_callback), self.default_timeout) - except Empty: - self.log.error("Opportunistic scan found no scan results.") + if not self._verify_no_events_found(scan_result.format(scan_callback)): return False return True diff --git a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py index ba8760aa88..1a32b25178 100644 --- a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py +++ b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py @@ -54,6 +54,13 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): time.sleep(4) return True + def setup_test(self): + if not bt_test_utils.connect_pri_to_sec( + self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])): + return False + # Grace time for connection to complete. + time.sleep(3) + def message_delivered(self, device): try: self.MCE.ed.pop_event(EventSmsDeliverSuccess, 15) @@ -97,15 +104,11 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='0858347a-e649-4f18-85b6-6990cc311dee') @BluetoothBaseTest.bt_test_wrap def test_send_message(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) return self.send_message([self.REMOTE]) @test_tracker_info(uuid='b25caa53-3c7f-4cfa-a0ec-df9a8f925fe5') @BluetoothBaseTest.bt_test_wrap def test_receive_message(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) self.MSE.log.info("Start Tracking SMS.") self.MSE.droid.smsStartTrackingIncomingSmsMessage() self.REMOTE.log.info("Ready to send") @@ -130,6 +133,9 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='19444142-1d07-47dc-860b-f435cba46fca') @BluetoothBaseTest.bt_test_wrap def test_send_message_failure_no_map_connection(self): + if not bt_test_utils.disconnect_pri_from_sec( + self.MCE, self.MSE, [BtEnum.BluetoothProfile.MAP_MCE.value]): + return False return not self.send_message([self.REMOTE]) @test_tracker_info(uuid='c7e569c0-9f6c-49a4-8132-14bc544ccb53') @@ -148,8 +154,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='8cdb4a54-3f18-482f-be3d-acda9c4cbeed') @BluetoothBaseTest.bt_test_wrap def test_disconnect_failure_send_message(self): - connected = bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) addr = self.MSE.droid.bluetoothGetLocalAddress() if bt_test_utils.is_map_mce_device_connected(self.MCE, addr): connected = True @@ -167,8 +171,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='2d79a896-b1c1-4fb7-9924-db8b5c698be5') @BluetoothBaseTest.bt_test_wrap def manual_test_send_message_to_contact(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) contacts = self.MCE.droid.contactsGetContactIds() self.log.info(contacts) selected_contact = self.MCE.droid.contactsDisplayContactPickList() @@ -181,6 +183,4 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='8ce9a7dd-3b5e-4aee-a897-30740e2439c3') @BluetoothBaseTest.bt_test_wrap def test_send_message_to_multiple_phones(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) return self.send_message([self.REMOTE, self.REMOTE]) diff --git a/acts/tests/google/net/CoreNetworkingOTATest.py b/acts/tests/google/net/CoreNetworkingOTATest.py new file mode 100755 index 0000000000..2444971a87 --- /dev/null +++ b/acts/tests/google/net/CoreNetworkingOTATest.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import time +import urllib.request + +import acts.base_test +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.net import connectivity_const as cconst + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams + + +class CoreNetworkingOTATest(BaseTestClass): + """Core networking auto update tests""" + + def setup_class(self): + super(CoreNetworkingOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.dut = self.android_devices[0] + req_params = dir(VPN_PARAMS) + req_params = [x for x in req_params if not x.startswith('__')] + req_params.extend(["download_file", "file_size"]) + self.unpack_userparams(req_params) + + vpn_params = {'vpn_username': self.vpn_username, + 'vpn_password': self.vpn_password, + 'psk_secret': self.psk_secret, + 'client_pkcs_file_name': self.client_pkcs_file_name, + 'cert_path_vpnserver': self.cert_path_vpnserver, + 'cert_password': self.cert_password} + + # generate legacy vpn profiles + wutils.wifi_connect(self.dut, self.wifi_network) + self.xauth_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_XAUTH_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_XAUTH_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.l2tp_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.L2TP_IPSEC_RSA, + self.vpn_server_addresses[VPN_TYPE.L2TP_IPSEC_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.hybrid_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_HYBRID_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_HYBRID_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.vpn_profiles = [self.l2tp_rsa, self.hybrid_rsa, self.xauth_rsa] + + # verify legacy vpn + for prof in self.vpn_profiles: + nutils.legacy_vpn_connection_test_logic(self.dut, prof) + + # Run OTA below, if ota fails then abort all tests. + try: + for ad in self.android_devices: + ota_updater.update(ad) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + """Tests""" + + def test_legacy_vpn_ota_xauth_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.xauth_rsa) + + def test_legacy_vpn_ota_l2tp_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.l2tp_rsa) + + def test_legacy_vpn_ota_hybrid_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.hybrid_rsa) diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py index f06be4a54f..308beda8e1 100644 --- a/acts/tests/google/net/CoreNetworkingTest.py +++ b/acts/tests/google/net/CoreNetworkingTest.py @@ -17,8 +17,7 @@ from acts import asserts from acts import base_test from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.wifi import wifi_test_utils as wutils dum_class = "com.android.tests.connectivity.uid.ConnectivityTestActivity" @@ -30,17 +29,15 @@ class CoreNetworkingTest(base_test.BaseTestClass): def setup_class(self): """ Setup devices for tests and unpack params """ self.dut = self.android_devices[0] - wutils.wifi_toggle_state(self.dut, False) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) def teardown_class(self): """ Reset devices """ wutils.wifi_toggle_state(self.dut, True) + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + """ Test Cases """ @test_tracker_info(uuid="0c89d632-aafe-4bbd-a812-7b0eca6aafc7") diff --git a/acts/tests/google/net/DataCostTest.py b/acts/tests/google/net/DataCostTest.py index 4a829489e6..fb63c7a36b 100644 --- a/acts/tests/google/net/DataCostTest.py +++ b/acts/tests/google/net/DataCostTest.py @@ -25,8 +25,7 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.tel.tel_test_utils import _check_file_existance from acts.test_utils.tel.tel_test_utils import _generate_file_directory_and_file_name from acts.test_utils.wifi import wifi_test_utils as wutils @@ -48,17 +47,19 @@ class DataCostTest(base_test.BaseTestClass): self.unpack_userparams(req_params) for ad in self.android_devices: - wutils.reset_wifi(ad) - ad.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, ad, True) - asserts.assert_true( - verify_http_connection(self.log, ad), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(ad) def teardown_class(self): + """ Reset settings to default """ for ad in self.android_devices: + sub_id = str(ad.droid.telephonyGetSubscriberId()) + ad.droid.connectivityFactoryResetNetworkPolicies(sub_id) + ad.droid.connectivitySetDataWarningLimit(sub_id, -1) wutils.reset_wifi(ad) - ad.droid.telephonyToggleDataConnection(True) + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -155,10 +156,6 @@ class DataCostTest(base_test.BaseTestClass): ad.droid.connectivitySetDataWarningLimit( sub_id, int((total_pre + 5) * 1000.0 * 1000.0)) - # reset data limit - ad.droid.connectivityFactoryResetNetworkPolicies(sub_id) - ad.droid.connectivitySetDataWarningLimit(sub_id, -1) - # verify multipath preference values self._verify_multipath_preferences( ad, RELIABLE, NONE, wifi_network, cell_network) diff --git a/acts/tests/google/net/DataUsageTest.py b/acts/tests/google/net/DataUsageTest.py index 0e636ac35b..32ebd7f632 100644 --- a/acts/tests/google/net/DataUsageTest.py +++ b/acts/tests/google/net/DataUsageTest.py @@ -23,6 +23,7 @@ from acts.test_utils.net import connectivity_const as cconst from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection from acts.test_utils.tel.tel_test_utils import http_file_download_by_chrome from acts.test_utils.tel.tel_test_utils import verify_http_connection +import acts.test_utils.net.net_test_utils as nutils from acts.test_utils.tel import tel_test_utils as ttutils from acts.test_utils.wifi import wifi_test_utils as wutils @@ -56,12 +57,7 @@ class DataUsageTest(base_test.BaseTestClass): """ Setup devices for tests and unpack params """ self.dut = self.android_devices[0] self.tethered_devices = self.android_devices[1:] - wutils.reset_wifi(self.dut) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) # unpack user params req_params = ("wifi_network", "download_file", "file_size", "network") @@ -79,24 +75,16 @@ class DataUsageTest(base_test.BaseTestClass): # Set chrome browser start with no-first-run verification # Give permission to read from and write to storage - commands = ["pm grant com.android.chrome " - "android.permission.READ_EXTERNAL_STORAGE", - "pm grant com.android.chrome " - "android.permission.WRITE_EXTERNAL_STORAGE", - "rm /data/local/chrome-command-line", - "am set-debug-app --persistent com.android.chrome", - 'echo "chrome --no-default-browser-check --no-first-run ' - '--disable-fre" > /data/local/tmp/chrome-command-line'] - for cmd in commands: - for dut in self.android_devices: - try: - dut.adb.shell(cmd) - except adb.AdbError: - self.log.warn("adb command %s failed on %s" % (cmd, dut.serial)) + nutils.set_chrome_browser_permissions(self.dut) def teardown_class(self): """ Reset devices """ - wutils.reset_wifi(self.dut) + for ad in self.android_devices: + wutils.reset_wifi(ad) + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -206,9 +194,6 @@ class DataUsageTest(base_test.BaseTestClass): 5. Verify that data usage of ConnUIDTest app increased by ~xMB 6. Verify that data usage of device also increased by ~xMB """ - # disable wifi - wutils.wifi_toggle_state(self.dut, False) - # get pre mobile data usage (aos_pre, app_pre, total_pre) = self._get_data_usage(self.dut, cconst.TYPE_MOBILE) @@ -264,6 +249,10 @@ class DataUsageTest(base_test.BaseTestClass): self.log.info("Data usage of Android os increased by %s" % aos_diff) self.log.info("Data usage of ConnUID app increased by %s" % app_diff) self.log.info("Data usage on the device increased by %s" % total_diff) + + # forget network + wutils.wifi_forget_network(self.dut, self.wifi_network['SSID']) + return (aos_diff < DATA_ERR) and \ (self.file_size < app_diff < self.file_size + DATA_USG_ERR) and \ (self.file_size < total_diff < self.file_size + DATA_USG_ERR) diff --git a/acts/tests/google/net/DhcpServerTest.py b/acts/tests/google/net/DhcpServerTest.py new file mode 100644 index 0000000000..1f33d63422 --- /dev/null +++ b/acts/tests/google/net/DhcpServerTest.py @@ -0,0 +1,1055 @@ +from acts import asserts +from acts import base_test +from acts.controllers import android_device +from acts.test_decorators import test_tracker_info + +from scapy.all import * +from threading import Event +from threading import Thread +import random +import time +import warnings + + +CLIENT_PORT = 68 +SERVER_PORT = 67 +BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' +INET4_ANY = '0.0.0.0' +NETADDR_PREFIX = '192.168.42.' +OTHER_NETADDR_PREFIX = '192.168.43.' +NETADDR_BROADCAST = '255.255.255.255' +SUBNET_BROADCAST = NETADDR_PREFIX + '255' + + +OFFER = 2 +REQUEST = 3 +ACK = 5 +NAK = 6 + + +pmc_base_cmd = ( + "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ") +start_pmc_cmd = ( + "am start -S -n com.android.pmc/com.android.pmc.PMCMainActivity") +pmc_start_usb_tethering_cmd = "%sStartUSBTethering" % pmc_base_cmd +pmc_stop_usb_tethering_cmd = "%sStopUSBTethering" % pmc_base_cmd + + +class DhcpServerTest(base_test.BaseTestClass): + def setup_class(self): + self.dut = self.android_devices[0] + self.USB_TETHERED = False + self.next_hwaddr_index = 0 + self.stop_arp = Event() + + conf.checkIPaddr = 0 + conf.checkIPsrc = 0 + # Allow using non-67 server ports as long as client uses 68 + bind_layers(UDP, BOOTP, dport=CLIENT_PORT) + + self.dut.adb.shell(start_pmc_cmd) + self.dut.adb.shell("setprop log.tag.PMC VERBOSE") + iflist_before = get_if_list() + self._start_usb_tethering(self.dut) + self.iface = self._wait_for_new_iface(iflist_before) + self.real_hwaddr = get_if_raw_hwaddr(self.iface) + + # Start a thread to answer to all ARP "who-has" + thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) + thread.start() + + # Discover server IP and device hwaddr + hwaddr = self._next_hwaddr() + resp = self._get_response(self._make_discover(hwaddr)) + asserts.assert_false(None == resp, + "Device did not reply to first DHCP discover") + self.server_addr = getopt(resp, 'server_id') + self.dut_hwaddr = resp.getlayer(Ether).src + asserts.assert_false(None == self.server_addr, + "DHCP server did not specify server identifier") + # Ensure that we don't depend on assigned route/gateway on the host + conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0") + + def setup_test(self): + # Some versions of scapy do not close the receive file properly + warnings.filterwarnings("ignore", category=ResourceWarning) + + bind_layers(UDP, BOOTP, dport=68) + self.hwaddr = self._next_hwaddr() + self.other_hwaddr = self._next_hwaddr() + self.cleanup_releases = [] + + def teardown_test(self): + for packet in self.cleanup_releases: + self._send(packet) + + def teardown_class(self): + self.stop_arp.set() + self._stop_usb_tethering(self.dut) + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + def _start_usb_tethering(self, dut): + """ Start USB tethering + + Args: + 1. dut - ad object + """ + self.log.info("Starting USB Tethering") + dut.stop_services() + dut.adb.shell(pmc_start_usb_tethering_cmd) + self._wait_for_device(self.dut) + self.USB_TETHERED = True + + def _stop_usb_tethering(self, dut): + """ Stop USB tethering + + Args: + 1. dut - ad object + """ + self.log.info("Stopping USB Tethering") + dut.adb.shell(pmc_stop_usb_tethering_cmd) + self._wait_for_device(self.dut) + dut.start_services(skip_sl4a=getattr(dut, "skip_sl4a", False)) + self.USB_TETHERED = False + + def _wait_for_device(self, dut): + """ Wait for device to come back online + + Args: + 1. dut - ad object + """ + while dut.serial not in android_device.list_adb_devices(): + pass + dut.adb.wait_for_device() + + def _wait_for_new_iface(self, old_ifaces): + old_set = set(old_ifaces) + # Try 10 times to find a new interface with a 1s sleep every time + # (equivalent to a 9s timeout) + for i in range(0, 10): + new_ifaces = set(get_if_list()) - old_set + asserts.assert_true(len(new_ifaces) < 2, + "Too many new interfaces after turning on tethering") + if len(new_ifaces) == 1: + return new_ifaces.pop() + time.sleep(1) + asserts.fail("Timeout waiting for tethering interface on host") + + def _sniff_arp(self, stop_arp): + try: + sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0, + stop_filter=lambda p: stop_arp.is_set()) + except: + # sniff may raise when tethering is disconnected. Ignore + # exceptions if stop was requested. + if not stop_arp.is_set(): + raise + + def _handle_arp(self, packet): + # Reply to all arp "who-has": say we have everything + if packet[ARP].op == ARP.who_has: + reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst, + hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST) + sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply, + iface=self.iface, verbose=False) + + @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") + def test_config_assumptions(self): + resp = self._get_response(self._make_discover(self.hwaddr)) + asserts.assert_false(None == resp, "Device did not reply to discover") + asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), + "Server does not use expected prefix") + + @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568") + def test_discover_broadcastbit(self): + resp = self._get_response( + self._make_discover(self.hwaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191") + def test_discover_bootpfields(self): + discover = self._make_discover(self.hwaddr) + resp = self._get_response(discover) + self._assert_offer(resp) + self._assert_unicast(resp) + bootp = assert_bootp_response(resp, discover) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) + + @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182") + def test_discover_relayed_broadcastbit(self): + giaddr = NETADDR_PREFIX + '123' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp) + + def _run_discover_paramrequestlist(self, params, unwanted_params): + params_opt = make_paramrequestlist_opt(params) + resp = self._get_response( + self._make_discover(self.hwaddr, options=[params_opt])) + + self._assert_offer(resp) + # List of requested params in response order + resp_opts = get_opt_labels(resp) + resp_requested_opts = [opt for opt in resp_opts if opt in params] + # All above params should be supported, order should be conserved + asserts.assert_equal(params, resp_requested_opts) + asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params))) + return resp + + @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8") + def test_discover_paramrequestlist(self): + resp = self._run_discover_paramrequestlist( + ['subnet_mask', 'broadcast_address', 'router', 'name_server'], + unwanted_params=[]) + for opt in ['broadcast_address', 'router', 'name_server']: + asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX), + opt + ' does not start with ' + NETADDR_PREFIX) + + subnet_mask = getopt(resp, 'subnet_mask') + asserts.assert_true(subnet_mask.startswith('255.255.'), + 'Unexpected subnet mask for /16+: ' + subnet_mask) + + @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46") + def test_discover_paramrequestlist_rev(self): + # RFC2132 #9.8: "The DHCP server is not required to return the options + # in the requested order, but MUST try to insert the requested options + # in the order requested" + asserts.skip('legacy behavior not compliant: fixed order used') + self._run_discover_paramrequestlist( + ['name_server', 'router', 'broadcast_address', 'subnet_mask'], + unwanted_params=[]) + + @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b") + def test_discover_paramrequestlist_unwanted(self): + asserts.skip('legacy behavior always sends all parameters') + self._run_discover_paramrequestlist(['router', 'name_server'], + unwanted_params=['broadcast_address', 'subnet_mask']) + + def _assert_renews(self, request, addr, exp_time, resp_type=ACK): + # Sleep to test lease time renewal + time.sleep(3) + resp = self._get_response(request) + self._assert_type(resp, resp_type) + asserts.assert_equal(addr, get_yiaddr(resp)) + remaining_lease = getopt(resp, 'lease_time') + # Lease renewed: waited for 3s, lease time not decreased by more than 2 + asserts.assert_true(remaining_lease >= exp_time - 2, + 'Lease not renewed') + # Lease times should be consistent across offers/renewals + asserts.assert_true(remaining_lease <= exp_time + 2, + 'Lease time inconsistent') + return resp + + @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") + def test_discover_assigned_ownaddress(self): + addr, siaddr, resp = self._request_address(self.hwaddr) + + lease_time = getopt(resp, 'lease_time') + server_id = getopt(resp, 'server_id') + asserts.assert_true(lease_time >= 60, "Lease time is too short") + asserts.assert_false(addr == INET4_ANY, "Assigned address is empty") + # Wait to test lease expiration time change + time.sleep(3) + + # New discover, same address + resp = self._assert_renews(self._make_discover(self.hwaddr), + addr, lease_time, resp_type=OFFER) + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") + def test_discover_assigned_otherhost(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same address, different client + resp = self._get_response(self._make_discover(self.other_hwaddr, + [('requested_addr', addr)])) + + self._assert_offer(resp) + asserts.assert_false(get_yiaddr(resp) == addr, + "Already assigned address offered") + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") + def test_discover_requestaddress(self): + addr = NETADDR_PREFIX + '200' + resp = self._get_response(self._make_discover(self.hwaddr, + [('requested_addr', addr)])) + self._assert_offer(resp) + asserts.assert_equal(get_yiaddr(resp), addr) + + # Lease not committed: can request again + resp = self._get_response(self._make_discover(self.other_hwaddr, + [('requested_addr', addr)])) + self._assert_offer(resp) + asserts.assert_equal(get_yiaddr(resp), addr) + + @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd") + def test_discover_requestaddress_wrongsubnet(self): + addr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, [('requested_addr', addr)])) + self._assert_offer(resp) + self._assert_unicast(resp) + asserts.assert_false(get_yiaddr(resp) == addr, + 'Server offered invalid address') + + @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf") + def test_discover_giaddr_outside_subnet(self): + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1") + def test_discover_srcaddr_outside_subnet(self): + srcaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, ip_src=srcaddr)) + self._assert_offer(resp) + asserts.assert_false(srcaddr == get_yiaddr(resp), + 'Server offered invalid address') + + @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e") + def test_discover_requestaddress_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + req = self._make_discover(self.hwaddr, [('requested_addr', addr)], + ip_src=giaddr, giaddr=giaddr) + resp = self._get_response(req) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db") + def test_discover_knownaddress_giaddr_outside_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in invalid subnet + giaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557") + def test_discover_knownaddress_giaddr_valid_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in valid subnet + giaddr = NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7") + def test_request_unicast(self): + addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False) + self._assert_unicast(resp, addr) + + @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3") + def test_request_bootpfields(self): + req_addr = NETADDR_PREFIX + '200' + req = self._make_request(self.hwaddr, req_addr, self.server_addr) + resp = self._get_response(req) + self._assert_ack(resp) + bootp = assert_bootp_response(resp, req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) + + @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") + def test_request_selecting_inuse(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + new_req = self._make_request(self.other_hwaddr, addr, siaddr) + resp = self._get_response(new_req) + self._assert_nak(resp) + self._assert_broadcast(resp) + bootp = assert_bootp_response(resp, new_req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(INET4_ANY, bootp.yiaddr) + asserts.assert_equal(INET4_ANY, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp)) + asserts.assert_equal( + ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt + get_opt_labels(bootp)) + asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id')) + + @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c") + def test_request_selecting_wrongsiaddr(self): + addr = NETADDR_PREFIX + '200' + wrong_siaddr = NETADDR_PREFIX + '201' + asserts.assert_false(wrong_siaddr == self.server_addr, + 'Test assumption not met: server addr is ' + wrong_siaddr) + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr)) + asserts.assert_true(resp == None, + 'Received response for request with incorrect siaddr') + + @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f") + def test_request_selecting_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=self.server_addr, + giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f") + def test_request_selecting_hostnameupdate(self): + addr = NETADDR_PREFIX + '123' + hostname1 = b'testhostname1' + hostname2 = b'testhostname2' + req = self._make_request(self.hwaddr, None, None, + options=[ + ('requested_addr', addr), + ('server_id', self.server_addr), + ('hostname', hostname1)]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname1, getopt(req, 'hostname')) + + # Re-request with different hostname + setopt(req, 'hostname', hostname2) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname2, getopt(req, 'hostname')) + + def _run_initreboot(self, bcastbit): + addr, siaddr, resp = self._request_address(self.hwaddr) + exp = getopt(resp, 'lease_time') + + # init-reboot: siaddr is None + return self._assert_renews(self._make_request( + self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp) + + @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") + def test_request_initreboot(self): + resp = self._run_initreboot(bcastbit=False) + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51") + def test_request_initreboot_broadcastbit(self): + resp = self._run_initreboot(bcastbit=True) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") + def test_request_initreboot_nolease(self): + # RFC2131 #4.3.2 + asserts.skip("legacy behavior not compliant") + addr = NETADDR_PREFIX + '123' + resp = self._get_response(self._make_request(self.hwaddr, addr, None)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") + def test_request_initreboot_incorrectlease(self): + otheraddr = NETADDR_PREFIX + '123' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + + resp = self._get_response( + self._make_request(self.hwaddr, otheraddr, siaddr=None)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") + def test_request_initreboot_wrongnet(self): + resp = self._get_response(self._make_request(self.hwaddr, + OTHER_NETADDR_PREFIX + '1', siaddr=None)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + def _run_rebinding(self, bcastbit, giaddr=INET4_ANY): + addr, siaddr, resp = self._request_address(self.hwaddr) + exp = getopt(resp, 'lease_time') + + # Rebinding: no siaddr or reqaddr + resp = self._assert_renews( + self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=addr, giaddr=giaddr, ip_src=addr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit), + addr, exp) + return resp, addr + + @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") + def test_request_rebinding(self): + resp, addr = self._run_rebinding(bcastbit=False) + self._assert_unicast(resp, addr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2") + def test_request_rebinding_relayed(self): + giaddr = NETADDR_PREFIX + '123' + resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") + def test_request_rebinding_inuse(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + resp = self._get_response(self._make_request( + self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") + def test_request_rebinding_wrongaddr(self): + otheraddr = NETADDR_PREFIX + '123' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + + resp = self._get_response(self._make_request( + self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb") + def test_request_rebinding_wrongaddr_relayed(self): + otheraddr = NETADDR_PREFIX + '123' + relayaddr = NETADDR_PREFIX + '124' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + asserts.assert_false(addr == relayaddr, + "Test assumption not met: server assigned " + relayaddr) + + req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=otheraddr, giaddr=relayaddr) + + resp = self._get_response(req) + self._assert_nak(resp) + self._assert_unicast(resp, relayaddr) + self._assert_broadcastbit(resp) + + @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") + def test_release(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + # Re-requesting fails + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + # Succeeds after release + self._send(self._make_release(self.hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) + self._assert_ack(resp) + + @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") + def test_release_noserverid(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # Release without server_id opt is ignored + release = self._make_release(self.hwaddr, addr, siaddr) + removeopt(release, 'server_id') + self._send(release) + + # Not released: request fails + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") + def test_release_wrongserverid(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # Release with wrong server id + release = self._make_release(self.hwaddr, addr, siaddr) + setopt(release, 'server_id', addr) + self._send(release) + + # Not released: request fails + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce") + def test_unicast_l2l3(self): + reqAddr = NETADDR_PREFIX + '124' + resp = self._get_response(self._make_request( + self.hwaddr, reqAddr, siaddr=None)) + self._assert_unicast(resp) + str_hwaddr = format_hwaddr(self.hwaddr) + asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst) + asserts.assert_equal(reqAddr, resp.getlayer(IP).dst) + asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + params_opt = make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 'NetBIOS_server', + 46, # NetBIOS over TCP/IP Node Type + ]) + req = self._make_discover(self.hwaddr, + options=[ + params_opt, + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_standard_offer(resp) + + def _make_macos_10_13_3_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + ]) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + req = self._make_discover(self.hwaddr, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + + @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86") + def test_macos_10_13_3_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test12-macbookpro'), + ]) + req.getlayer(BOOTP).secs = 5 + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp) + + # Note: macOS does not seem to do any rebinding (straight to discover) + @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b") + def test_macos_10_13_3_request_renewing(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True) + + def _make_win10_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'router', + 'name_server', + 'domain', + 31, # Perform Router Discover + 33, # Static Route + 'vendor_specific', + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + 47, # NetBIOS over TCP/IP Scope + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 252, # Private/Proxy autodiscovery + ]) + + @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76") + def test_win10_discover(self): + req = self._make_discover(self.hwaddr, bcastbit=True, + options=[ + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ], opts_padding=bytes(11)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410") + def test_win10_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, bcastbit=True, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + def _run_win10_request_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, bcastbit=bcast, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast) + + @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9") + def test_win10_request_renewing(self): + self._run_win10_request_renewing(bcast=False) + + @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751") + def test_win10_request_rebinding(self): + self._run_win10_request_renewing(bcast=True) + + def _make_debian_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'broadcast_address', + 'router', + 'name_server', + 119, # Domain Search + 'hostname', + 101, # TCode + 'domain', # NetBIOS over TCP/IP Name Server + 'vendor_specific', # NetBIOS over TCP/IP Node Type + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 33, # Static Route + 252, # Private/Proxy autodiscovery + 'NTP_server', + ]) + + @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5") + def test_debian_dhclient_4_3_5_discover(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_discover(self.hwaddr, + options=[ + ('requested_addr', req_ip), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(26)) + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + asserts.assert_equal(req_ip, get_yiaddr(resp)) + + @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac") + def test_debian_dhclient_4_3_5_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + ('server_id', self.server_addr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(20)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, with_hostname=True) + + def _run_debian_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], + opts_padding=bytes(32)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, + with_hostname=True) + + @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc") + def test_debian_dhclient_4_3_5_request_renewing(self): + self._run_debian_renewing(bcast=False) + + @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db") + def test_debian_dhclient_4_3_5_request_rebinding(self): + self._run_debian_renewing(bcast=True) + + def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False, + with_hostname=False): + # Responses to renew/rebind are always unicast to ciaddr even with + # broadcast flag set (RFC does not define this behavior, but this is + # more efficient and matches previous behavior) + if bcast and not renewing: + self._assert_broadcast(resp) + else: + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=bcast) + + bootp_resp = resp.getlayer(BOOTP) + asserts.assert_equal(0, bootp_resp.hops) + if renewing: + asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX), + 'ciaddr does not start with expected prefix') + else: + asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr) + asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX), + 'yiaddr does not start with expected prefix') + asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX), + 'siaddr does not start with expected prefix') + asserts.assert_equal(INET4_ANY, bootp_resp.giaddr) + + opt_labels = get_opt_labels(bootp_resp) + # FQDN option 81 is not supported in new behavior + opt_labels = [opt for opt in opt_labels if opt != 81] + + # Expect exactly these options in this order + expected_opts = [ + 'message-type', 'server_id', 'lease_time', 'renewal_time', + 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router', + 'name_server'] + if with_hostname: + expected_opts.append('hostname') + expected_opts.extend(['vendor_specific', 'end']) + asserts.assert_equal(expected_opts, opt_labels) + + def _request_address(self, hwaddr, bcast=True): + resp = self._get_response(self._make_discover(hwaddr)) + self._assert_offer(resp) + addr = get_yiaddr(resp) + siaddr = getopt(resp, 'server_id') + resp = self._get_response(self._make_request(hwaddr, addr, siaddr, + ip_dst=(INET4_ANY if bcast else siaddr))) + self._assert_ack(resp) + return addr, siaddr, resp + + def _get_response(self, packet): + resp = srp1(packet, iface=self.iface, timeout=10, verbose=False) + bootp_resp = (resp or None) and resp.getlayer(BOOTP) + if bootp_resp != None and get_mess_type(bootp_resp) == ACK: + # Note down corresponding release for this request + release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr, + getopt(bootp_resp, 'server_id')) + self.cleanup_releases.append(release) + return resp + + def _send(self, packet): + sendp(packet, iface=self.iface, verbose=False) + + def _assert_type(self, packet, tp): + asserts.assert_false(None == packet, "No packet") + asserts.assert_equal(tp, get_mess_type(packet)) + + def _assert_ack(self, packet): + self._assert_type(packet, ACK) + + def _assert_nak(self, packet): + self._assert_type(packet, NAK) + + def _assert_offer(self, packet): + self._assert_type(packet, OFFER) + + def _assert_broadcast(self, packet): + asserts.assert_false(None == packet, "No packet") + asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) + asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) + self._assert_broadcastbit(packet) + + def _assert_broadcastbit(self, packet, isset=True): + mask = 0x8000 + flag = packet.getlayer(BOOTP).flags + asserts.assert_equal(flag & mask, mask if isset else 0) + + def _assert_unicast(self, packet, ipAddr=None): + asserts.assert_false(None == packet, "No packet") + asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC, + "Layer 2 packet destination address was broadcast") + if ipAddr: + asserts.assert_equal(packet.getlayer(IP).dst, ipAddr) + + def _next_hwaddr(self): + addr = make_hwaddr(self.next_hwaddr_index) + self.next_hwaddr_index = self.next_hwaddr_index + 1 + return addr + + def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY, + ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY, + bcastbit=False): + broadcast = (ip_dst == NETADDR_BROADCAST) + ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr)) + ip = IP(src=ip_src, dst=ip_dst) + udp = UDP(sport=68, dport=SERVER_PORT) + bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr, + flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32)) + dhcp = DHCP(options=options) + return ethernet / ip / udp / bootp / dhcp + + def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY, + bcastbit=False, opts_padding=None, ip_src=INET4_ANY): + opts = [('message-type','discover')] + opts.extend(options) + opts.append('end') + if (opts_padding): + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src) + + def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY, + ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False, + options=[], opts_padding=None): + if not ip_dst: + ip_dst = siaddr or INET4_ANY + + if not ip_src and ip_dst == INET4_ANY: + ip_src = INET4_ANY + elif not ip_src: + ip_src = (giaddr if not isempty(giaddr) + else ciaddr if not isempty(ciaddr) + else reqaddr) + # Kernel will not receive unicast UDP packets with empty ip_src + asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src), + "Unicast ip_src cannot be zero") + opts = [('message-type', 'request')] + if options: + opts.extend(options) + else: + if siaddr: + opts.append(('server_id', siaddr)) + if reqaddr: + opts.append(('requested_addr', reqaddr)) + opts.append('end') + if opts_padding: + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, + ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit) + + def _make_release(self, src_hwaddr, addr, server_id): + opts = [('message-type', 'release'), ('server_id', server_id), 'end'] + return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr, + ip_dst=server_id) + +def assert_bootp_response(resp, req): + bootp = resp.getlayer(BOOTP) + asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op') + asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype') + asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen') + asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops') + asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID') + return bootp + + +def make_paramrequestlist_opt(params): + param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt + for opt in params] + return tuple(['param_req_list'] + [ + opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt + for opt in param_indexes]) + + +def isempty(addr): + return not addr or addr == INET4_ANY + + +def setopt(packet, optname, val): + dhcp = packet.getlayer(DHCP) + if optname in get_opt_labels(dhcp): + dhcp.options = [(optname, val) if opt[0] == optname else opt + for opt in dhcp.options] + else: + # Add before the last option (last option is "end") + dhcp.options.insert(len(dhcp.options) - 1, (optname, val)) + + +def getopt(packet, key): + opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key] + return opts[0] if opts else None + + +def removeopt(packet, key): + dhcp = packet.getlayer(DHCP) + dhcp.options = [opt for opt in dhcp.options if opt[0] != key] + + +def get_opt_labels(packet): + dhcp_resp = packet.getlayer(DHCP) + # end option is a single string, not a tuple. + return [opt if isinstance(opt, str) else opt[0] + for opt in dhcp_resp.options if opt != 'pad'] + + +def get_yiaddr(packet): + return packet.getlayer(BOOTP).yiaddr + + +def get_chaddr(packet): + # We use Ethernet addresses. Ignore address padding + return packet.getlayer(BOOTP).chaddr[:6] + + +def get_mess_type(packet): + return getopt(packet, 'message-type') + + +def make_hwaddr(index): + if index > 0xffff: + raise ValueError("Address index out of range") + return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff]) + + +def format_hwaddr(addr): + return ':'.join(['%02x' % c for c in addr]) diff --git a/acts/tests/google/net/DnsOverTlsTest.py b/acts/tests/google/net/DnsOverTlsTest.py index 46a6c01c6f..8d9fbed7ce 100644 --- a/acts/tests/google/net/DnsOverTlsTest.py +++ b/acts/tests/google/net/DnsOverTlsTest.py @@ -25,21 +25,22 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils from scapy.all import TCP from scapy.all import UDP from scapy.all import rdpcap +from scapy.all import Scapy_Exception DNS_QUAD9 = "dns.quad9.net" PRIVATE_DNS_MODE_OFF = "off" PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic" PRIVATE_DNS_MODE_STRICT = "hostname" RST = 0x04 +WLAN = "wlan0" class DnsOverTlsTest(base_test.BaseTestClass): """ Tests for Wifi Tethering """ @@ -48,17 +49,14 @@ class DnsOverTlsTest(base_test.BaseTestClass): """ Setup devices for tethering and unpack params """ self.dut = self.android_devices[0] - wutils.reset_wifi(self.dut) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) req_params = ("wifi_network_with_dns_tls", "wifi_network_no_dns_tls", "ping_hosts") self.unpack_userparams(req_params) self.tcpdump_pid = None - self.tcpdump_file = None + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -68,9 +66,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): Args: 1. ad: dut to run tcpdump on """ - if self.tcpdump_pid: - stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=False) - self.tcpdump_pid = start_adb_tcpdump(ad, self.test_name, mask='all') + self.tcpdump_pid = start_tcpdump(ad, self.test_name) def _stop_tcp_dump(self, ad): """ Stop tcpdump and pull it to the test run logs @@ -78,13 +74,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): Args: 1. ad: dut to pull tcpdump from """ - file_name = ad.adb.shell("ls /sdcard/tcpdump") - file_name = os.path.join(ad.log_path, "TCPDUMP_%s" % ad.serial, - file_name.split('/')[-1]) - if self.tcpdump_pid: - stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=True) - self.tcpdump_pid = None - return os.path.join(ad.log_path, file_name) + return stop_tcpdump(ad, self.tcpdump_pid, self.test_name) def _verify_dns_queries_over_tls(self, pcap_file, tls=True): """ Verify if DNS queries were over TLS or not @@ -136,6 +126,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): mode = self.dut.droid.getPrivateDnsMode() asserts.assert_true(mode == dns_mode, "Failed to set private DNS mode to %s" % dns_mode) + time.sleep(2) # ping hosts should pass for host in self.ping_hosts: @@ -296,8 +287,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): # DNS server in link properties for wifi network link_prop = self.dut.droid.connectivityGetActiveLinkProperties() - dns_servers = link_prop['DnsServers'] - wifi_dns_servers = [each for lst in dns_servers for each in lst] + wifi_dns_servers = link_prop['PrivateDnsServerName'] self.log.info("Link prop: %s" % wifi_dns_servers) # DUT is on LTE data @@ -308,8 +298,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): # DNS server in link properties for cell network link_prop = self.dut.droid.connectivityGetActiveLinkProperties() - dns_servers = link_prop['DnsServers'] - lte_dns_servers = [each for lst in dns_servers for each in lst] + lte_dns_servers = link_prop['PrivateDnsServerName'] self.log.info("Link prop: %s" % lte_dns_servers) # stop tcpdump on device @@ -323,7 +312,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): "Hostname not in link properites - cell network") @test_tracker_info(uuid="525a6f2d-9751-474e-a004-52441091e427") - def test_dns_over_tls_no_reset_packets(self): + def dns_over_tls_no_reset_packets(self): """ Verify there are no TCP packets with RST flags Steps: diff --git a/acts/tests/google/net/IpSecTest.py b/acts/tests/google/net/IpSecTest.py index dc7fd42c89..41ae0bc978 100644 --- a/acts/tests/google/net/IpSecTest.py +++ b/acts/tests/google/net/IpSecTest.py @@ -20,8 +20,8 @@ from acts.test_decorators import test_tracker_info from acts.test_utils.net import connectivity_const as cconst from acts.test_utils.net import ipsec_test_utils as iutils from acts.test_utils.net import socket_test_utils as sutils -from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils import random @@ -47,32 +47,22 @@ class IpSecTest(base_test.BaseTestClass): self.ipv6_dut_a = self.dut_a.droid.connectivityGetIPv6Addresses(WLAN)[0] self.ipv6_dut_b = self.dut_b.droid.connectivityGetIPv6Addresses(WLAN)[0] + self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo() + self.tcpdump_pid_a = None - self.tcpdump_file_a = None self.tcpdump_pid_b = None - self.tcpdump_file_b = None - - self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo() def teardown_class(self): for ad in self.android_devices: wutils.reset_wifi(ad) def setup_test(self): - self.tcpdump_pid_a = start_adb_tcpdump( - self.dut_a, self.test_name, mask='all') - self.tcpdump_pid_b = start_adb_tcpdump( - self.dut_b, self.test_name, mask='all') + self.tcpdump_pid_a = start_tcpdump(self.dut_a, self.test_name) + self.tcpdump_pid_b = start_tcpdump(self.dut_b, self.test_name) def teardown_test(self): - if self.tcpdump_pid_a: - stop_adb_tcpdump( - self.dut_a, self.tcpdump_pid_a, pull_tcpdump=True) - if self.tcpdump_pid_b: - stop_adb_tcpdump( - self.dut_b, self.tcpdump_pid_b, pull_tcpdump=True) - self.tcpdump_pid_a = None - self.tcpdump_pid_b = None + stop_tcpdump(self.dut_a, self.tcpdump_pid_a, self.test_name) + stop_tcpdump(self.dut_b, self.tcpdump_pid_b, self.test_name) def on_fail(self, test_name, begin_time): self.dut_a.take_bug_report(test_name, begin_time) diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py index 786a40a822..d1dd385cce 100644 --- a/acts/tests/google/net/LegacyVpnTest.py +++ b/acts/tests/google/net/LegacyVpnTest.py @@ -25,7 +25,8 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.wifi import wifi_test_utils +from acts.test_utils.net import net_test_utils as nutils +from acts.test_utils.wifi import wifi_test_utils as wutils from acts.test_utils.net import connectivity_const VPN_CONST = connectivity_const.VpnProfile @@ -48,140 +49,36 @@ class LegacyVpnTest(base_test.BaseTestClass): required_params = dir(VPN_PARAMS) required_params = [x for x in required_params if not x.startswith('__')] self.unpack_userparams(required_params) - wifi_test_utils.wifi_test_device_init(self.dut) - wifi_test_utils.wifi_connect(self.dut, self.wifi_network) + wutils.wifi_test_device_init(self.dut) + wutils.wifi_connect(self.dut, self.wifi_network) time.sleep(3) + self.vpn_params = {'vpn_username': self.vpn_username, + 'vpn_password': self.vpn_password, + 'psk_secret': self.psk_secret, + 'client_pkcs_file_name': self.client_pkcs_file_name, + 'cert_path_vpnserver': self.cert_path_vpnserver, + 'cert_password': self.cert_password} + def teardown_class(self): """ Reset wifi to make sure VPN tears down cleanly """ - wifi_test_utils.reset_wifi(self.dut) + wutils.reset_wifi(self.dut) def on_fail(self, test_name, begin_time): self.dut.take_bug_report(test_name, begin_time) - def download_load_certs(self, vpn_type, vpn_server_addr, ipsec_server_type): - """ Download the certificates from VPN server and push to sdcard of DUT - - Args: - VPN type name - - Returns: - Client cert file name on DUT's sdcard - """ - url = "http://%s%s%s" % (vpn_server_addr, - self.cert_path_vpnserver, - self.client_pkcs_file_name) - local_cert_name = "%s_%s_%s" % (vpn_type.name, - ipsec_server_type, - self.client_pkcs_file_name) - local_file_path = os.path.join(self.log_path, local_cert_name) - try: - ret = urllib.request.urlopen(url) - with open(local_file_path, "wb") as f: - f.write(ret.read()) - except: - asserts.fail("Unable to download certificate from the server") - f.close() - self.dut.adb.push("%s sdcard/" % local_file_path) - return local_cert_name - - def generate_legacy_vpn_profile(self, vpn_type, vpn_server_addr, ipsec_server_type): - """ Generate legacy VPN profile for a VPN - - Args: - VpnProfileType - """ - vpn_profile = {VPN_CONST.USER: self.vpn_username, - VPN_CONST.PWD: self.vpn_password, - VPN_CONST.TYPE: vpn_type.value, - VPN_CONST.SERVER: vpn_server_addr,} - vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,ipsec_server_type) - if vpn_type.name == "PPTP": - vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name - psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) - rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) - if vpn_type.name in psk_set: - vpn_profile[VPN_CONST.IPSEC_SECRET] = self.psk_secret - elif vpn_type.name in rsa_set: - cert_name = self.download_load_certs(vpn_type, - vpn_server_addr, - ipsec_server_type) - vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] - vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] - self.dut.droid.installCertificate(vpn_profile,cert_name,self.cert_password) - else: - vpn_profile[VPN_CONST.MPPE] = self.pptp_mppe - return vpn_profile - - def verify_ping_to_vpn_ip(self, connected_vpn_info): - """ Verify if IP behind VPN server is pingable - Ping should pass, if VPN is connected - Ping should fail, if VPN is disconnected - - Args: - connected_vpn_info which specifies the VPN connection status - """ - ping_result = None - pkt_loss = "100% packet loss" - try: - ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s" - % self.vpn_verify_address) - except adb.AdbError: - pass - return ping_result and pkt_loss not in ping_result - - def legacy_vpn_connection_test_logic(self, vpn_profile): - """ Test logic for each legacy VPN connection - - Steps: - 1. Generate profile for the VPN type - 2. Establish connection to the server - 3. Verify that connection is established using LegacyVpnInfo - 4. Verify the connection by pinging the IP behind VPN - 5. Stop the VPN connection - 6. Check the connection status - 7. Verify that ping to IP behind VPN fails - - Args: - VpnProfileType (1 of the 6 types supported by Android) - """ - # Wait for sometime so that VPN server flushes all interfaces and - # connections after graceful termination - time.sleep(10) - self.dut.adb.shell("ip xfrm state flush") - logging.info("Connecting to: %s", vpn_profile) - self.dut.droid.vpnStartLegacyVpn(vpn_profile) - time.sleep(connectivity_const.VPN_TIMEOUT) - connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo() - asserts.assert_equal(connected_vpn_info["state"], - connectivity_const.VPN_STATE_CONNECTED, - "Unable to establish VPN connection for %s" - % vpn_profile) - ping_result = self.verify_ping_to_vpn_ip(connected_vpn_info) - ip_xfrm_state = self.dut.adb.shell("ip xfrm state") - match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) - if match_obj: - ip_xfrm_state = format(match_obj.group(0)).split() - self.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) - self.dut.droid.vpnStopLegacyVpn() - asserts.assert_true(ping_result, - "Ping to the internal IP failed. " - "Expected to pass as VPN is connected") - connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo() - asserts.assert_true(not connected_vpn_info, - "Unable to terminate VPN connection for %s" - % vpn_profile) - """ Test Cases """ @test_tracker_info(uuid="d2ac5a65-41fb-48de-a0a9-37e589b5456b") def test_legacy_vpn_pptp(self): """ Verify PPTP VPN connection """ vpn = VPN_TYPE.PPTP - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="99af78dd-40b8-483a-8344-cd8f67594971") def legacy_vpn_l2tp_ipsec_psk_libreswan(self): @@ -189,10 +86,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="e67d8c38-92c3-4167-8b6c-a49ef939adce") def legacy_vpn_l2tp_ipsec_rsa_libreswan(self): @@ -200,10 +99,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="8b3517dc-6a3b-44c2-a85d-bd7b969df3cf") def legacy_vpn_ipsec_xauth_psk_libreswan(self): @@ -211,10 +112,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="abac663d-1d91-4b87-8e94-11c6e44fb07b") def legacy_vpn_ipsec_xauth_rsa_libreswan(self): @@ -222,10 +125,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="84140d24-53c0-4f6c-866f-9d66e04442cc") def test_legacy_vpn_l2tp_ipsec_psk_openswan(self): @@ -233,10 +138,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="f7087592-7eed-465d-bfe3-ed7b6d9d5f9a") def test_legacy_vpn_l2tp_ipsec_rsa_openswan(self): @@ -244,10 +151,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="ed78973b-13ee-4dd4-b998-693ab741c6f8") def test_legacy_vpn_ipsec_xauth_psk_openswan(self): @@ -255,10 +164,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="cfd125c4-b64c-4c49-b8e4-fbf05a9be8ec") def test_legacy_vpn_ipsec_xauth_rsa_openswan(self): @@ -266,10 +177,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="419370de-0aa1-4a56-8c22-21567fa1cbb7") def test_legacy_vpn_l2tp_ipsec_psk_strongswan(self): @@ -277,10 +190,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="f7694081-8bd6-4e31-86ec-d538c4ff1f2e") def test_legacy_vpn_l2tp_ipsec_rsa_strongswan(self): @@ -288,10 +203,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="2f86eb98-1e05-42cb-b6a6-fd90789b6cde") def test_legacy_vpn_ipsec_xauth_psk_strongswan(self): @@ -299,10 +216,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="af0cd7b1-e86c-4327-91b4-e9062758f2cf") def test_legacy_vpn_ipsec_xauth_rsa_strongswan(self): @@ -310,10 +229,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongswan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="7b970d0a-1c7d-4a5a-b406-4815e190ef26") def test_legacy_vpn_ipsec_hybrid_rsa_strongswan(self): @@ -321,7 +242,9 @@ class LegacyVpnTest(base_test.BaseTestClass): strongswan server """ vpn = VPN_TYPE.IPSEC_HYBRID_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) diff --git a/acts/tests/google/net/NattKeepAliveTest.py b/acts/tests/google/net/NattKeepAliveTest.py new file mode 100644 index 0000000000..aaf1b1b141 --- /dev/null +++ b/acts/tests/google/net/NattKeepAliveTest.py @@ -0,0 +1,151 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts import base_test +from acts import utils +from acts.controllers import adb +from acts.test_decorators import test_tracker_info +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump +from acts.test_utils.net import connectivity_test_utils as cutils +from acts.test_utils.wifi import wifi_test_utils as wutils + +import random +import time + +WLAN = "wlan0" +PKTS = 5 +SERVER_UDP_KEEPALIVE = "python /root/udp_nat_keepalive.py" +KEEPALIVE_DATA = "ff" + + +class NattKeepAliveTest(base_test.BaseTestClass): + """ Tests for NATT keepalive """ + + def setup_class(self): + """ Setup devices for tests and unpack params """ + + self.dut = self.android_devices[0] + req_params = ("wifi_network", "remote_server", "server_ssh_config") + self.unpack_userparams(req_params) + + wutils.wifi_connect(self.dut, self.wifi_network) + + self.ip_a = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0] + self.ip_b = self.remote_server + self.log.info("DUT IP addr: %s" % self.ip_a) + self.log.info("Remote server IP addr: %s" % self.ip_b) + self.tcpdump_pid = None + + def teardown_class(self): + wutils.reset_wifi(self.dut) + + def setup_test(self): + self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) + + def teardown_test(self): + stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + """ Helper functions """ + + def _verify_time_interval(self, time_interval, cmd_out): + """ Verify time diff between packets is equal to time interval """ + + self.log.info("Packet Info: \n%s\n" % cmd_out) + pkts = cmd_out.rstrip().decode("utf-8", "ignore").split("\n") + + prev = 0 + for i in range(len(pkts)): + interval, data = pkts[i][1:-1].split(',') + + # verify data + if data.lstrip().rstrip()[1:-1] != KEEPALIVE_DATA: + self.log.error("Server received invalid data %s" % data) + return False + + # verify time interval + curr = float(interval) + if i == 0: + prev = curr + continue + diff = int(round(curr-prev)) + if diff != time_interval: + self.log.error("Keepalive time interval is %s, expected %s" + % (diff, time_interval)) + return False + prev = curr + + return True + + """ Tests begin """ + + @test_tracker_info(uuid="c9012da2-656f-44ef-bad6-26892335d4bd") + def test_natt_keepalive_ipv4(self): + """ Test natt keepalive over wifi + + Steps: + 1. Open a UDP port 4500 on linux host + 2. Start NATT keepalive packets on DUT and send 5 packets + 3. Verify that 5 keepalive packets reached host with data '0xff' + """ + + # set a time interval + result = True + time_interval = random.randint(10, 60) + port = random.randint(8000, 9000) + self.log.info("NATT keepalive time interval is %s" % time_interval) + self.log.info("Source port is %s" % port) + time_out = time_interval * PKTS + 6 + + # start NATT keep alive + nka_key = cutils.start_natt_keepalive( + self.dut, self.ip_a, port, self.ip_b, time_interval) + asserts.assert_true(nka_key, "Failed to start NATT keepalive") + + # capture packets on server + self.log.info("Capturing keepalive packets on %s" % self.ip_b) + cmd_out = utils.exe_cmd("timeout %s %s" % + (time_out, SERVER_UDP_KEEPALIVE)) + + # verify packets received + result = self._verify_time_interval(time_interval, cmd_out) + + # stop NATT keep alive + status = cutils.stop_natt_keepalive(self.dut, nka_key) + asserts.assert_true(status, "Failed to stop NATT keepalive") + + return result + + @test_tracker_info(uuid="8ab20733-4a9e-4e4d-a46f-4d32a9f221c5") + def test_natt_keepalive_ipv4_invalid_interval(self): + """ Test invalid natt keepalive time interval + + Steps: + 1. Start NATT keepalive with time interval less than 10 seconds + 2. API should return invalid interval + """ + + # start NATT keep alive + port = random.randint(8000, 9000) + nka_key = cutils.start_natt_keepalive( + self.dut, self.ip_a, port, self.ip_b, 2) + asserts.assert_true(not nka_key, + "Started NATT keepalive with invalid interval") + + """ Tests end """ diff --git a/acts/tests/google/net/ProxyTest.py b/acts/tests/google/net/ProxyTest.py new file mode 100644 index 0000000000..66aa345118 --- /dev/null +++ b/acts/tests/google/net/ProxyTest.py @@ -0,0 +1,178 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts import base_test +from acts.test_decorators import test_tracker_info +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump +from acts.test_utils.wifi import wifi_test_utils as wutils + +from scapy.all import IP +from scapy.all import TCP +from scapy.all import UDP +from scapy.all import Raw +from scapy.all import rdpcap +from scapy.all import Scapy_Exception + + +class ProxyTest(base_test.BaseTestClass): + """ Network proxy tests """ + + def setup_class(self): + """ Setup devices for tests and unpack params """ + self.dut = self.android_devices[0] + req_params = ("wifi_network_no_dns_tls", "proxy_pac", "proxy_server", + "proxy_port", "bypass_host", "non_bypass_host") + self.unpack_userparams(req_params) + wutils.wifi_connect(self.dut, self.wifi_network_no_dns_tls) + self.tcpdump_pid = None + self.proxy_port = int(self.proxy_port) + + def teardown_test(self): + self.dut.droid.connectivityResetGlobalProxy() + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + if global_proxy: + self.log.error("Failed to reset global proxy settings") + + def teardown_class(self): + wutils.reset_wifi(self.dut) + + """ Helper methods """ + + def _verify_http_request(self, ad): + """ Send http requests to hosts + + Steps: + 1. Send http requests to hosts + a. Host that is bypassed by proxy server + b. Host that goes through proxy server + 2. Verify that both return valid responses + + Args: + 1. ad: dut to run http requests + """ + for host in [self.bypass_host, self.non_bypass_host]: + host = "https://%s" % host + result = ad.droid.httpRequestString(host) + asserts.assert_true(result, "Http request failed for %s" % host) + + def _verify_proxy_server(self, pcap_file, bypass_host, hostname): + """ Verify that http requests are going through proxy server + + Args: + 1. tcpdump: pcap file + 2. bypass_host: boolean value if the request goes through proxy + 3. hostname: hostname requested + + Returns: + True/False if the bypass condition met + """ + self.log.info("Checking proxy server for query to: %s" % hostname) + try: + packets = rdpcap(pcap_file) + except Scapy_Exception: + asserts.fail("Not a valid pcap file") + + dns_query = False + http_query = False + for pkt in packets: + summary = "%s" % pkt.summary() + if UDP in pkt and pkt[UDP].dport == 53 and hostname in summary: + dns_query = True + break + if TCP in pkt and pkt[TCP].dport == self.proxy_port and Raw in pkt\ + and hostname in str(pkt[Raw]): + http_query = True + + self.log.info("Bypass hostname set to: %s" % bypass_host) + self.log.info("Found DNS query for host: %s" % dns_query) + self.log.info("Found HTTP query for host: %s" % http_query) + if bypass_host and http_query and not dns_query or \ + not bypass_host and not http_query and dns_query: + return False + return True + + def _test_proxy(self): + """ Test pac piroxy and manual proxy settings + + Steps: + 1. Start tcpdump + 2. Run http requests + 3. Stop tcpdump + 4. Verify the packets from tcpdump have valid queries + """ + + # start tcpdump on the device + self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) + + # verify http requests + self._verify_http_request(self.dut) + + # stop tcpdump on the device + pcap_file = stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) + + # verify proxy server + result = self._verify_proxy_server(pcap_file, True, self.bypass_host) + asserts.assert_true(result, "Proxy failed for %s" % self.bypass_host) + result = self._verify_proxy_server(pcap_file, False, self.non_bypass_host) + asserts.assert_true(result, "Proxy failed for %s" % self.non_bypass_host) + + """ Test Cases """ + + @test_tracker_info(uuid="16881315-1a50-48ce-bd36-7b0d2f21b734") + def test_pac_proxy_over_wifi(self): + """ Test proxy with auto config over wifi + + Steps: + 1. Connect to a wifi network + 2. Set a global proxy with auto config + 3. Do a http request on the hostnames + 4. Verify that no DNS packets seen for non bypassed hostnames + 5. Verify that DNS packets seen for bypassed hostnames + """ + # set global pac proxy + self.log.info("Setting global proxy to: %s" % self.proxy_pac) + self.dut.droid.connectivitySetGlobalPacProxy(self.proxy_pac) + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + asserts.assert_true(global_proxy['PacUrl'] == self.proxy_pac, + "Failed to set pac proxy") + + # test proxy + self._test_proxy() + + @test_tracker_info(uuid="4d3361f6-866d-423c-9ed7-5a6943575fe9") + def test_manual_proxy_over_wifi(self): + """ Test manual proxy over wifi + + Steps: + 1. Connect to a wifi network + 2. Set a global manual proxy with proxy server, port & bypass URLs + 3. Do a http request on the hostnames + 4. Verify that no DNS packets are seen for non bypassed hostnames + 5. Verify that DNS packets seen for bypassed hostnames + """ + # set global manual proxy + self.log.info("Setting global proxy to: %s %s %s" % + (self.proxy_server, self.proxy_port, self.bypass_host)) + self.dut.droid.connectivitySetGlobalProxy(self.proxy_server, + self.proxy_port, + self.bypass_host) + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + asserts.assert_true(global_proxy['Hostname'] == self.proxy_server, + "Failed to set manual proxy") + + # test proxy + self._test_proxy() diff --git a/acts/tests/google/wifi/WifiIOTTest.py b/acts/tests/google/wifi/WifiIOTTest.py index 22e67d9e28..75449517f7 100755 --- a/acts/tests/google/wifi/WifiIOTTest.py +++ b/acts/tests/google/wifi/WifiIOTTest.py @@ -132,12 +132,12 @@ class WifiIOTTest(WifiBaseTest): """Tests""" @test_tracker_info(uuid="a57cc861-b6c2-47e4-9db6-7a3ab32c6e20") - def iot_connection_to_ubiquity_ap1_2g(self): + def test_iot_connection_to_ubiquity_ap1_2g(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="2065c2f7-2b89-4da7-a15d-e5dc17b88d52") - def iot_connection_to_ubiquity_ap1_5g(self): + def test_iot_connection_to_ubiquity_ap1_5g(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -152,12 +152,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="02a8cc75-6781-4153-8d90-bed7568a1e78") - def iot_connection_to_AirportExtreme_2G(self): + def test_iot_connection_to_AirportExtreme_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="83a42c97-1358-4ba7-bdb2-238fdb1c945e") - def iot_connection_to_AirportExtreme_5G(self): + def test_iot_connection_to_AirportExtreme_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -172,12 +172,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="2503d9ed-35df-4be0-b838-590324cecaee") - def test_iot_connection_to_Dlink_AC1200_2G(self): + def iot_connection_to_Dlink_AC1200_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="0a44e148-a4bf-43f4-88eb-e4c1ffa850ce") - def test_iot_connection_to_Dlink_AC1200_5G(self): + def iot_connection_to_Dlink_AC1200_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -242,17 +242,17 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="054d2ffc-97fd-4613-bf47-acedd0fa4701") - def iot_connection_to_NETGEAR_AC3200_2G(self): + def test_iot_connection_to_NETGEAR_AC3200_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="d15a789a-def5-4c6a-b59e-1a75f73cc6a9") - def iot_connection_to_NETGEAR_AC3200_5G_1(self): + def test_iot_connection_to_NETGEAR_AC3200_5G_1(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="1de6369e-97da-479f-b17c-9144bb814f51") - def iot_connection_to_NETGEAR_AC3200_5G_2(self): + def test_iot_connection_to_NETGEAR_AC3200_5G_2(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -282,12 +282,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="e5517b82-c225-449d-83ac-055a561a764f") - def iot_connection_to_TP_LINK_AC1700_2G(self): + def test_iot_connection_to_TP_LINK_AC1700_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="9531d3cc-129d-4501-a5e3-d7502120cd8b") - def iot_connection_to_TP_LINK_AC1700_5G(self): + def test_iot_connection_to_TP_LINK_AC1700_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) diff --git a/acts/tests/google/wifi/WifiIOTTwPkg1Test.py b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py new file mode 100644 index 0000000000..b30c606b79 --- /dev/null +++ b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import pprint +import time + +import acts.signals +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +from acts.controllers import iperf_server as ipf + +import json +import logging +import math +import os +from acts import utils +import csv + +WifiEnums = wutils.WifiEnums + + +class WifiIOTTwPkg1Test(WifiBaseTest): + """ Tests for wifi IOT + + Test Bed Requirement: + * One Android device + * Wi-Fi IOT networks visible to the device + """ + + def __init__(self, controllers): + self.attenuators = None + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + wutils.wifi_test_device_init(self.dut) + + req_params = [ "iot_networks", ] + opt_params = [ "open_network", + "iperf_server_address","iperf_port_arg", + "pdu_address" , "pduon_wait_time","pduon_address" + ] + self.unpack_userparams(req_param_names=req_params, + opt_param_names=opt_params) + + asserts.assert_true( + len(self.iot_networks) > 0, + "Need at least one iot network with psk.") + + if getattr(self, 'open_network', False): + self.iot_networks.append(self.open_network) + + wutils.wifi_toggle_state(self.dut, True) + if "iperf_server_address" in self.user_params: + self.iperf_server = self.iperf_servers[0] + + # create hashmap for testcase name and SSIDs + self.iot_test_prefix = "test_iot_connection_to_" + self.ssid_map = {} + for network in self.iot_networks: + SSID = network['SSID'].replace('-','_') + self.ssid_map[SSID] = network + + # create folder for IOT test result + self.log_path = os.path.join(logging.log_path, "IOT_results") + utils.create_dir(self.log_path) + + Header=("test_name","throughput_TX","throughput_RX") + self.csv_write(Header) + + # check pdu_address + if "pdu_address" and "pduon_wait_time" in self.user_params: + self.pdu_func() + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + + def teardown_test(self): + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + wutils.reset_wifi(self.dut) + + def teardown_class(self): + if "iperf_server_address" in self.user_params: + self.iperf_server.stop() + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + self.dut.cat_adb_log(test_name, begin_time) + + """Helper Functions""" + + def connect_to_wifi_network(self, network): + """Connection logic for open and psk wifi networks. + + Args: + params: Dictionary with network info. + """ + SSID = network[WifiEnums.SSID_KEY] + self.dut.ed.clear_all_events() + wutils.start_wifi_connection_scan(self.dut) + scan_results = self.dut.droid.wifiGetScanResults() + wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results) + wutils.wifi_connect(self.dut, network, num_of_tries=3) + + def run_iperf_client(self, network): + """Run iperf TX throughput after connection. + + Args: + params: Dictionary with network info. + """ + if "iperf_server_address" in self.user_params: + + # Add iot_result + iot_result = [] + self.iperf_server.start(tag="TX_server_{}".format( + self.current_test_name)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic TX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J {}".format(self.iperf_server.port,self.iperf_port_arg) + success, data = self.dut.run_iperf_client(self.iperf_server_address, + port_arg) + # Parse and log result + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},TX_client_{}".format( + self.iperf_server.port,self.current_test_name)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = self.iperf_server.log_files[-1] + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = math.fsum(iperf_result.instantaneous_rates) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + iot_result.append(curr_throughput) + self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return iot_result + + def run_iperf_server(self, network): + """Run iperf RX throughput after connection. + + Args: + params: Dictionary with network info. + + Returns: + iot_result: dict containing iot_results + """ + if "iperf_server_address" in self.user_params: + + # Add iot_result + iot_result = [] + self.iperf_server.start(tag="RX_client_{}".format( + self.current_test_name)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic RX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J -R {}".format(self.iperf_server.port,self.iperf_port_arg) + success, data = self.dut.run_iperf_client(self.iperf_server_address, + port_arg) + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},RX_server_{}".format( + self.iperf_server.port,self.current_test_name)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = client_output_path + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = math.fsum(iperf_result.instantaneous_rates) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + iot_result.append(curr_throughput) + self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return iot_result + + def iperf_test_func(self,network): + """Main function to test iperf TX/RX. + + Args: + params: Dictionary with network info + """ + # Initialize + iot_result = {} + + # Run RvR and log result + iot_result["throughput_TX"] = self.run_iperf_client(network) + iot_result["throughput_RX"] = self.run_iperf_server(network) + iot_result["test_name"] = self.current_test_name + + # Save output as text file + results_file_path = "{}/{}.json".format(self.log_path, + self.current_test_name) + with open(results_file_path, 'w') as results_file: + json.dump(iot_result, results_file, indent=4) + + data=(iot_result["test_name"],iot_result["throughput_TX"][0], + iot_result["throughput_RX"][0]) + self.csv_write(data) + + def csv_write(self,data): + with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: + csv_writer = csv.writer(csv_file,delimiter=',') + csv_writer.writerow(data) + csv_file.close() + + def pdu_func(self): + """control Power Distribution Units on local machine. + + Logic steps are + 1. Turn off PDU for all port. + 2. Turn on PDU for specified port. + """ + out_file_name = "PDU.log" + self.full_out_path = os.path.join(self.log_path, out_file_name) + cmd = "curl http://snmp:1234@{}/offs.cgi?led=11111111> {}".format(self.pdu_address, + self.full_out_path) + self.pdu_process = utils.start_standing_subprocess(cmd) + wait_time = 10 + self.log.info("Starting set PDU to OFF") + time.sleep(wait_time) + self.full_out_path = os.path.join(self.log_path, out_file_name) + cmd = "curl http://snmp:1234@{}/ons.cgi?led={}> {}".format(self.pdu_address, + self.pduon_address, + self.full_out_path) + self.pdu_process = utils.start_standing_subprocess(cmd) + wait_time = int("{}".format(self.pduon_wait_time)) + self.log.info("Starting set PDU to ON for port1," + "wait for {}s".format(self.pduon_wait_time)) + time.sleep(wait_time) + self.log.info("PDU setup complete") + + def connect_to_wifi_network_and_run_iperf(self, network): + """Connection logic for open and psk wifi networks. + + Logic steps are + 1. Connect to the network. + 2. Run iperf throghput. + + Args: + params: A dictionary with network info. + """ + self.connect_to_wifi_network(network) + self.iperf_test_func(network) + + """Tests""" + + @test_tracker_info(uuid="0e4ad6ed-595c-4629-a4c9-c6be9c3c58e0") + def test_iot_connection_to_ASUS_RT_AC68U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="a76d8acc-808e-4a5d-a52b-5ba07d07b810") + def test_iot_connection_to_ASUS_RT_AC68U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="659a3e5e-07eb-4905-9cda-92e959c7b674") + def test_iot_connection_to_D_Link_DIR_868L_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="6bcfd736-30fc-48a8-b4fb-723d1d113f3c") + def test_iot_connection_to_D_Link_DIR_868L_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="c9da945a-2c4a-44e1-881d-adf307b39b21") + def test_iot_connection_to_TP_LINK_WR940N_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="db0d224d-df81-401f-bf35-08ad02e41a71") + def test_iot_connection_to_ASUS_RT_N66U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="845ff1d6-618d-40f3-81c3-6ed3a0751fde") + def test_iot_connection_to_ASUS_RT_N66U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="6908039b-ccc9-4777-a0f1-3494ce642014") + def test_iot_connection_to_ASUS_RT_AC54U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="2647c15f-2aad-47d7-8dee-b2ee1ac4cef6") + def test_iot_connection_to_ASUS_RT_AC54U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="99678f66-ddf1-454d-87e4-e55177ec380d") + def test_iot_connection_to_ASUS_RT_N56U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="4dd75e81-9a8e-44fd-9449-09f5ab8a63c3") + def test_iot_connection_to_ASUS_RT_N56U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="315397ce-50d5-4abf-a11c-1abcaef832d3") + def test_iot_connection_to_BELKIN_F9K1002v1_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="05ba464a-b1ef-4ac1-a32f-c919ec4aa1dd") + def test_iot_connection_to_CISCO_E1200_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="04912868-4a47-40ce-877e-4e4c89849557") + def test_iot_connection_to_TP_LINK_C2_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="53517a21-3802-4185-b8bb-6eaace063a42") + def test_iot_connection_to_TP_LINK_C2_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="71c08c1c-415d-4da4-a151-feef43fb6ad8") + def test_iot_connection_to_ASUS_RT_AC66U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="2322c155-07d1-47c9-bd21-2e358e3df6ee") + def test_iot_connection_to_ASUS_RT_AC66U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) diff --git a/acts/tests/google/wifi/WifiRvrTwTest.py b/acts/tests/google/wifi/WifiRvrTwTest.py new file mode 100644 index 0000000000..24afa5ea2d --- /dev/null +++ b/acts/tests/google/wifi/WifiRvrTwTest.py @@ -0,0 +1,482 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import pprint +import time + +import acts.signals +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +from acts.controllers import iperf_server as ipf + +import json +import logging +import math +import os +from acts import utils +import csv + +import serial +import sys + + +WifiEnums = wutils.WifiEnums + + +class WifiRvrTWTest(WifiBaseTest): + """ Tests for wifi RVR performance + + Test Bed Requirement: + * One Android device + * Wi-Fi networks visible to the device + """ + TEST_TIMEOUT = 10 + + def __init__(self, controllers): + self.attenuators = None + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + wutils.wifi_test_device_init(self.dut) + + req_params = [ "iot_networks","rvr_test_params"] + opt_params = [ "angle_params","usb_port"] + self.unpack_userparams(req_param_names=req_params, + opt_param_names=opt_params) + + asserts.assert_true( + len(self.iot_networks) > 0, + "Need at least one iot network with psk.") + + wutils.wifi_toggle_state(self.dut, True) + if "rvr_test_params" in self.user_params: + self.iperf_server = self.iperf_servers[0] + self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"] + self.MindB= self.rvr_test_params ["rvr_atten_MinDB"] + self.stepdB= self.rvr_test_params ["rvr_atten_step"] + + if "angle_params" in self.user_params: + self.angle = self.angle_params + + if "usb_port" in self.user_params: + self.T1=self.readport(self.usb_port["turntable"]) + self.ATT1=self.readport(self.usb_port["atten1"]) + self.ATT2=self.readport(self.usb_port["atten2"]) + self.ATT3=self.readport(self.usb_port["atten3"]) + + # create hashmap for testcase name and SSIDs + self.iot_test_prefix = "test_iot_connection_to_" + self.ssid_map = {} + for network in self.iot_networks: + SSID = network['SSID'].replace('-','_') + self.ssid_map[SSID] = network + + # create folder for rvr test result + self.log_path = os.path.join(logging.log_path, "rvr_results") + utils.create_dir(self.log_path) + + Header=("test_SSID","Turn table (angle)","Attenuator(dBm)", + "TX throughput (Mbps)","RX throughput (Mbps)", + "RSSI","Link speed","Frequency") + self.csv_write(Header) + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + + def teardown_test(self): + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + + def teardown_class(self): + if "rvr_test_params" in self.user_params: + self.iperf_server.stop() + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + self.dut.cat_adb_log(test_name, begin_time) + + """Helper Functions""" + + def csv_write(self,data): + """Output .CSV file for test result. + + Args: + data: Dict containing attenuation, throughput and other meta data. + """ + with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: + csv_writer = csv.writer(csv_file,delimiter=',') + csv_writer.writerow(data) + csv_file.close() + + def readport(self,com): + """Read com port for current test. + + Args: + com: Attenuator or turn table com port + """ + port=serial.Serial(com,9600,timeout=1) + time.sleep(1) + return port + + def getdB(self,port): + """Get attenuator dB for current test. + + Args: + port: Attenuator com port + """ + port.write('V?;'.encode()) + dB=port.readline().decode() + dB=dB.strip(';') + dB=dB[dB.find('V')+1:] + return int(dB) + + def setdB(self,port,dB): + """Setup attenuator dB for current test. + + Args: + port: Attenuator com port + dB: Attenuator setup dB + """ + if dB<0: + dB=0 + elif dB>101: + dB=101 + self.log.info("Set dB to "+str(dB)) + InputdB=str('V')+str(dB)+str(';') + port.write(InputdB.encode()) + time.sleep(0.1) + + def set_Three_Att_dB(self,port1,port2,port3,dB): + """Setup 3 attenuator dB for current test. + + Args: + port1: Attenuator1 com port + port1: Attenuator2 com port + port1: Attenuator com port + dB: Attenuator setup dB + """ + self.setdB(port1,dB) + self.setdB(port2,dB) + self.setdB(port3,dB) + self.checkdB(port1,dB) + self.checkdB(port2,dB) + self.checkdB(port3,dB) + + def checkdB(self,port,dB): + """Check attenuator dB for current test. + + Args: + port: Attenuator com port + dB: Attenuator setup dB + """ + retry=0 + while self.getdB(port)!=dB and retry<10: + retry=retry+1 + self.log.info("Current dB = "+str(self.getdB(port))) + self.log.info("Fail to set Attenuator to "+str(dB)+", " + +str(retry)+" times try to reset") + self.setdB(port,dB) + if retry ==10: + self.log.info("Retry Attenuator fail for 9 cycles, end test!") + sys.exit() + return 0 + + def getDG(self,port): + """Get turn table angle for current test. + + Args: + port: Turn table com port + """ + DG = "" + port.write('DG?;'.encode()) + time.sleep(0.1) + data = port.readline().decode('utf-8') + for i in range(len(data)): + if (data[i].isdigit()) == True: + DG = DG + data[i] + if DG == "": + return -1 + return int(DG) + + def setDG(self,port,DG): + """Setup turn table angle for current test. + + Args: + port: Turn table com port + DG: Turn table setup angle + """ + if DG>359: + DG=359 + elif DG<0: + DG=0 + self.log.info("Set angle to "+str(DG)) + InputDG=str('DG')+str(DG)+str(';') + port.write(InputDG.encode()) + + def checkDG(self,port,DG): + """Check turn table angle for current test. + + Args: + port: Turn table com port + DG: Turn table setup angle + """ + retrytime = self.TEST_TIMEOUT + retry = 0 + while self.getDG(port)!=DG and retry<retrytime: + retry=retry+1 + self.log.info('Current angle = '+str(self.getDG(port))) + self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset') + self.setDG(port,DG) + time.sleep(10) + if retry == retrytime: + self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!') + sys.exit() + return 0 + + def getwifiinfo(self): + """Get WiFi RSSI/ link speed/ frequency for current test. + + Returns: + [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency + """ + def is_number(string): + for i in string: + if i.isdigit() == False: + if (i=="-" or i=="."): + continue + return str(-1) + return string + + try: + cmd = "adb shell iw wlan0 link" + wifiinfo = utils.subprocess.check_output(cmd,shell=True, + timeout=self.TEST_TIMEOUT) + # Check RSSI + RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") + + 7:wifiinfo.decode("utf-8").find("dBm") - 1] + RSSI = RSSI.strip(' ') + RSSI = is_number(RSSI) + # Check link speed + LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") + + 8:wifiinfo.decode("utf-8").find("Bit/s") - 2] + LS = LS.strip(' ') + LS = is_number(LS) + # Check frequency + FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") + + 6:wifiinfo.decode("utf-8").find("freq:") + 10] + FR = FR.strip(' ') + FR = is_number(FR) + except: + return -1, -1, -1 + return [RSSI,LS,FR] + + def post_process_results(self, rvr_result): + """Saves JSON formatted results. + + Args: + rvr_result: Dict containing attenuation, throughput and other meta + data + """ + # Save output as text file + data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"], + rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0], + rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"]) + self.csv_write(data) + + results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path, + self.ssid, + self.angle[self.ag],self.DB) + with open(results_file_path, 'w') as results_file: + json.dump(rvr_result, results_file, indent=4) + + def connect_to_wifi_network(self, network): + """Connection logic for psk wifi networks. + + Args: + params: Dictionary with network info. + """ + SSID = network[WifiEnums.SSID_KEY] + self.dut.ed.clear_all_events() + wutils.start_wifi_connection_scan(self.dut) + scan_results = self.dut.droid.wifiGetScanResults() + wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results) + wutils.wifi_connect(self.dut, network, num_of_tries=3) + + def run_iperf_client(self, network): + """Run iperf TX throughput after connection. + + Args: + params: Dictionary with network info. + + Returns: + rvr_result: Dict containing rvr_results + """ + rvr_result = [] + self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format( + self.ssid,self.angle[self.ag],self.DB)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic TX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J {}".format(self.iperf_server.port, + self.rvr_test_params["iperf_port_arg"]) + success, data = self.dut.run_iperf_client( + self.rvr_test_params["iperf_server_address"], + port_arg, + timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) + # Parse and log result + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format( + self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = self.iperf_server.log_files[-1] + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ + self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( + iperf_result.instantaneous_rates[self.rvr_test_params[ + "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + rvr_result.append(curr_throughput) + self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( + self.DB, curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return rvr_result + + def run_iperf_server(self, network): + """Run iperf RX throughput after connection. + + Args: + params: Dictionary with network info. + + Returns: + rvr_result: Dict containing rvr_results + """ + rvr_result = [] + self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format( + self.ssid,self.angle[self.ag],self.DB)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic RX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J -R {}".format(self.iperf_server.port, + self.rvr_test_params["iperf_port_arg"]) + success, data = self.dut.run_iperf_client( + self.rvr_test_params["iperf_server_address"], + port_arg, + timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT) + # Parse and log result + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format( + self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = client_output_path + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = (math.fsum(iperf_result.instantaneous_rates[ + self.rvr_test_params["iperf_ignored_interval"]:-1]) / len( + iperf_result.instantaneous_rates[self.rvr_test_params[ + "iperf_ignored_interval"]:-1])) * 8 * (1.024**2) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + rvr_result.append(curr_throughput) + self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format( + self.DB, curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return rvr_result + + def iperf_test_func(self,network): + """Main function to test iperf TX/RX. + + Args: + params: Dictionary with network info + """ + if "rvr_test_params" in self.user_params: + # Initialize + rvr_result = {} + # Run RvR and log result + wifiinfo = self.getwifiinfo() + rvr_result["throughput_TX"] = self.run_iperf_client(network) + rvr_result["throughput_RX"] = self.run_iperf_server(network) + rvr_result["test_name"] = self.ssid + rvr_result["test_angle"] = self.angle[self.ag] + rvr_result["test_dB"] = self.DB + rvr_result["test_RSSI"] = wifiinfo[0] + rvr_result["test_LS"] = wifiinfo[1] + rvr_result["test_FR"] = wifiinfo[2] + self.post_process_results(rvr_result) + + def rvr_test(self,network): + """Test function to run RvR. + + The function runs an RvR test in the current device/AP configuration. + Function is called from another wrapper function that sets up the + testbed for the RvR test + + Args: + params: Dictionary with network info + """ + wait_time = 5 + utils.subprocess.check_output('adb root', shell=True, timeout=20) + self.ssid = network[WifiEnums.SSID_KEY] + self.log.info("Start rvr test") + for i in range(len(self.angle)): + self.setDG(self.T1,self.angle[i]) + time.sleep(wait_time) + self.checkDG(self.T1,self.angle[i]) + self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0) + time.sleep(wait_time) + self.connect_to_wifi_network(network) + self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB) + for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB): + self.DB=j + self.ag=i + self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB) + self.iperf_test_func(network) + wutils.reset_wifi(self.dut) + + """Tests""" + + @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158") + def test_iot_connection_to_RVR_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.rvr_test(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac") + def test_iot_connection_to_RVR_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.rvr_test(self.ssid_map[ssid_key])
\ No newline at end of file diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py index 6f0beba430..ffe0effb96 100755 --- a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py +++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py @@ -346,9 +346,9 @@ class WifiStaApConcurrencyTest(WifiBaseTest): def test_softap_5G_wifi_connection_2G(self): """Tests bringing up SoftAp on 5G followed by connection to 2G network. """ - self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G(self): @@ -356,7 +356,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G_DFS(self): @@ -364,14 +364,14 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) @test_tracker_info(uuid="aa23a3fc-31a1-4d5c-8cf5-2eb9fdf9e7ce") def test_softap_5G_wifi_connection_2G_with_location_scan_on(self): """Tests bringing up SoftAp on 5G followed by connection to 2G network with location scans turned on. """ - self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.turn_location_on_and_scan_toggle_on() self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py index 78608d7142..e99aed609f 100755 --- a/acts/tests/google/wifi/WifiStressTest.py +++ b/acts/tests/google/wifi/WifiStressTest.py @@ -19,7 +19,7 @@ import time import acts.base_test import acts.test_utils.wifi.wifi_test_utils as wutils -import acts.utils +import acts.test_utils.tel.tel_test_utils as tutils from acts import asserts from acts import signals @@ -134,6 +134,31 @@ class WifiStressTest(WifiBaseTest): if "100% packet loss" in result: raise signals.TestFailure("100% packet loss during ping") + def start_youtube_video(self, url=None, secs=60): + """Start a youtube video and check if it's playing. + + Args: + url: The URL of the youtube video to play. + secs: Time to play video in seconds. + + """ + ad = self.dut + ad.log.info("Start a youtube video") + ad.ensure_screen_on() + video_played = False + for count in range(2): + ad.unlock_screen() + ad.adb.shell('am start -a android.intent.action.VIEW -d "%s"' % url) + if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1): + ad.log.info("Started a video in youtube.") + # Play video for given seconds. + time.sleep(secs) + video_played = True + break + if not video_played: + raise signals.TestFailure("Youtube video did not start. Current WiFi " + "state is %d" % self.dut.droid.wifiCheckState()) + """Tests""" @test_tracker_info(uuid="cd0016c6-58cf-4361-b551-821c0b8d2554") @@ -206,17 +231,54 @@ class WifiStressTest(WifiBaseTest): sec = self.stress_hours * 60 * 60 args = "-p {} -t {} -R".format(self.iperf_server.port, sec) self.log.info("Running iperf client {}".format(args)) + start_time = time.time() result, data = self.dut.run_iperf_client(self.iperf_server_address, args, timeout=sec+1) if not result: self.log.debug("Error occurred in iPerf traffic.") + start_time = time.time() self.run_ping(sec) except: + total_time = time.time() - start_time raise signals.TestFailure("Network long-connect failed." - "Look at logs", extras={"Total Hours":"%d" %self.stress_hours, - "Seconds Run":"UNKNOWN"}) + "WiFi State = %d" %self.dut.droid.wifiCheckState(), + extras={"Total Hours":"%d" %self.stress_hours, + "Seconds Run":"%d" %total_time}) + total_time = time.time() - start_time + self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState()) raise signals.TestPass(details="", extras={"Total Hours":"%d" % - self.stress_hours, "Seconds":"%d" %sec}) + self.stress_hours, "Seconds Run":"%d" %total_time}) + + def test_stress_youtube_5g(self): + """Test to connect to network and play various youtube videos. + + Steps: + 1. Scan and connect to a network. + 2. Loop through and play a list of youtube videos. + 3. Verify no WiFi disconnects/data interruption. + + """ + # List of Youtube 4K videos. + videos = ["https://www.youtube.com/watch?v=TKmGU77INaM", + "https://www.youtube.com/watch?v=WNCl-69POro", + "https://www.youtube.com/watch?v=dVkK36KOcqs", + "https://www.youtube.com/watch?v=0wCC3aLXdOw", + "https://www.youtube.com/watch?v=rN6nlNC9WQA", + "https://www.youtube.com/watch?v=U--7hxRNPvk"] + try: + self.scan_and_connect_by_ssid(self.wpa_5g) + start_time = time.time() + for video in videos: + self.start_youtube_video(url=video, secs=10*60) + except: + total_time = time.time() - start_time + raise signals.TestFailure("The youtube stress test has failed." + "WiFi State = %d" %self.dut.droid.wifiCheckState(), + extras={"Total Hours":"1", "Seconds Run":"%d" %total_time}) + total_time = time.time() - start_time + self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState()) + raise signals.TestPass(details="", extras={"Total Hours":"1", + "Seconds Run":"%d" %total_time}) @test_tracker_info(uuid="d367c83e-5b00-4028-9ed8-f7b875997d13") def test_stress_wifi_failover(self): @@ -232,6 +294,7 @@ class WifiStressTest(WifiBaseTest): """ for count in range(int(self.stress_count/4)): + wutils.reset_wifi(self.dut) ssids = list() for network in self.networks: ssids.append(network[WifiEnums.SSID_KEY]) @@ -294,7 +357,8 @@ class WifiStressTest(WifiBaseTest): wutils.stop_wifi_tethering(self.dut) asserts.assert_false(self.dut.droid.wifiIsApEnabled(), "SoftAp failed to shutdown!") - time.sleep(TIMEOUT) + # Give some time for WiFi to come back to previous state. + time.sleep(2) cur_wifi_state = self.dut.droid.wifiCheckState() if initial_wifi_state != cur_wifi_state: raise signals.TestFailure("Wifi state was %d before softAP and %d now!" % diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py new file mode 100755 index 0000000000..a603e017fa --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GOpenOTATest(BaseTestClass): + """Wifi Tethering 2G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering2GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + @test_tracker_info(uuid="fe502bc3-f9c6-4bed-98ad-acaa7e166521") + def test_wifi_tethering_ota_2g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py new file mode 100755 index 0000000000..e9fedcd24c --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GPskOTATest(BaseTestClass): + """Wifi Tethering 2G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering2GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + @test_tracker_info(uuid="4b1cec63-d1d2-4046-84e9-e806bb08ce41") + def test_wifi_tethering_ota_2g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py new file mode 100755 index 0000000000..6648d0e461 --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GOpenOTATest(BaseTestClass): + """Wifi Tethering 5G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering5GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + @test_tracker_info(uuid="b1a94c8f-f3ed-4755-be4a-764e205b4483") + def test_wifi_tethering_ota_5g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py new file mode 100755 index 0000000000..74505787ff --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GPskOTATest(BaseTestClass): + """Wifi Tethering 5G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering5GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + @test_tracker_info(uuid="111d3a33-3152-4993-b1ba-307daaf2a6ff") + def test_wifi_tethering_ota_5g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py index 080f9e47c4..26be9b740c 100644 --- a/acts/tests/google/wifi/WifiTetheringTest.py +++ b/acts/tests/google/wifi/WifiTetheringTest.py @@ -31,6 +31,7 @@ from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G from acts.test_utils.net import socket_test_utils as sutils from acts.test_utils.net import arduino_test_utils as dutils +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.wifi import wifi_test_utils as wutils WAIT_TIME = 2 @@ -47,29 +48,19 @@ class WifiTetheringTest(base_test.BaseTestClass): self.unpack_userparams(req_params) self.new_ssid = "wifi_tethering_test2" - wutils.wifi_toggle_state(self.hotspot_device, False) - self.hotspot_device.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.hotspot_device, True) - asserts.assert_true( - verify_http_connection(self.log, self.hotspot_device), - "HTTP verification failed on cell data connection") - asserts.assert_true( - self.hotspot_device.droid.connectivityIsTetheringSupported(), - "Tethering is not supported for the provider") + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) for ad in self.tethered_devices: - ad.droid.telephonyToggleDataConnection(False) wutils.wifi_test_device_init(ad) def teardown_class(self): """ Reset devices """ - wutils.wifi_toggle_state(self.hotspot_device, True) for ad in self.tethered_devices: - ad.droid.telephonyToggleDataConnection(True) + wutils.reset_wifi(ad) def on_fail(self, test_name, begin_time): """ Collect bug report on failure """ - self.hotspot_device.take_bug_report(test_name, begin_time) - for ad in self.tethered_devices: + return + for ad in self.android_devices: ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -200,7 +191,7 @@ class WifiTetheringTest(base_test.BaseTestClass): """ Connect disconnect tethered devices to wifi hotspot """ num_android_devices = len(self.tethered_devices) num_wifi_dongles = 0 - if self.arduino_wifi_dongles: + if hasattr(self, 'arduino_wifi_dongles'): num_wifi_dongles = len(self.arduino_wifi_dongles) total_devices = num_android_devices + num_wifi_dongles device_connected = [False] * total_devices @@ -389,6 +380,8 @@ class WifiTetheringTest(base_test.BaseTestClass): 2. Connect 2 tethered devices to the hotspot device 3. Ping interfaces between the tethered devices """ + asserts.skip_if(not hasattr(self, 'arduino_wifi_dongles'), + "No wifi dongles connected. Skipping test") wutils.toggle_wifi_off_and_on(self.hotspot_device) self._start_wifi_tethering(WIFI_CONFIG_APBAND_2G) self._test_traffic_between_two_tethered_devices(self.tethered_devices[0], diff --git a/acts/tests/google/wifi/WifiThroughputStabilityTest.py b/acts/tests/google/wifi/WifiThroughputStabilityTest.py index c28bd63d2d..326e8e807b 100644 --- a/acts/tests/google/wifi/WifiThroughputStabilityTest.py +++ b/acts/tests/google/wifi/WifiThroughputStabilityTest.py @@ -29,6 +29,8 @@ from acts.test_utils.wifi import wifi_retail_ap as retail_ap from acts.test_utils.wifi import wifi_test_utils as wutils TEST_TIMEOUT = 10 +SHORT_SLEEP = 1 +MED_SLEEP = 6 class WifiThroughputStabilityTest(base_test.BaseTestClass): @@ -97,7 +99,7 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): "throughput_stability_test_params", "testbed_params", "main_network" ] - opt_params = ["RetailAccessPoints"] + opt_params = ["RetailAccessPoints", "golden_files_list"] self.unpack_userparams(req_params, opt_params) self.test_params = self.throughput_stability_test_params self.num_atten = self.attenuators[0].instrument.num_atten @@ -229,6 +231,14 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): test_result = {} # Configure AP band = self.access_point.band_lookup_by_channel(channel) + if "2G" in band: + frequency = wutils.WifiEnums.channel_2G_to_freq[channel] + else: + frequency = wutils.WifiEnums.channel_5G_to_freq[channel] + if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: + self.access_point.set_region(self.testbed_params["DFS_region"]) + else: + self.access_point.set_region(self.testbed_params["default_region"]) self.access_point.set_channel(band, channel) self.access_point.set_bandwidth(band, mode) self.log.info("Access Point Configuration: {}".format( @@ -240,10 +250,11 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): for i in range(self.num_atten) ] # Connect DUT to Network - self.main_network[band]["channel"] = channel + wutils.wifi_toggle_state(self.dut, True) wutils.reset_wifi(self.dut) + self.main_network[band]["channel"] = channel wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5) - time.sleep(5) + time.sleep(MED_SLEEP) # Run test and log result # Start iperf session self.log.info("Starting iperf test.") |