From bc038d4c000230cebbf0575e02b16bff7b68fad8 Mon Sep 17 00:00:00 2001 From: Timofey Protopopov Date: Thu, 24 May 2018 12:13:37 -0700 Subject: Fix For BtCarMapMce Test merge to pi_dev Test: Run BtCarMapMceTest all tests should pass. Bug: 80250185, 80250427 Change-Id: I69383cfcc97e89f27c32489ce84a0373b4a13b2e --- acts/tests/google/bt/car_bt/BtCarMapMceTest.py | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py index ba8760aa88..1a32b25178 100644 --- a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py +++ b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py @@ -54,6 +54,13 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): time.sleep(4) return True + def setup_test(self): + if not bt_test_utils.connect_pri_to_sec( + self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])): + return False + # Grace time for connection to complete. + time.sleep(3) + def message_delivered(self, device): try: self.MCE.ed.pop_event(EventSmsDeliverSuccess, 15) @@ -97,15 +104,11 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='0858347a-e649-4f18-85b6-6990cc311dee') @BluetoothBaseTest.bt_test_wrap def test_send_message(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) return self.send_message([self.REMOTE]) @test_tracker_info(uuid='b25caa53-3c7f-4cfa-a0ec-df9a8f925fe5') @BluetoothBaseTest.bt_test_wrap def test_receive_message(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) self.MSE.log.info("Start Tracking SMS.") self.MSE.droid.smsStartTrackingIncomingSmsMessage() self.REMOTE.log.info("Ready to send") @@ -130,6 +133,9 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='19444142-1d07-47dc-860b-f435cba46fca') @BluetoothBaseTest.bt_test_wrap def test_send_message_failure_no_map_connection(self): + if not bt_test_utils.disconnect_pri_from_sec( + self.MCE, self.MSE, [BtEnum.BluetoothProfile.MAP_MCE.value]): + return False return not self.send_message([self.REMOTE]) @test_tracker_info(uuid='c7e569c0-9f6c-49a4-8132-14bc544ccb53') @@ -148,8 +154,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='8cdb4a54-3f18-482f-be3d-acda9c4cbeed') @BluetoothBaseTest.bt_test_wrap def test_disconnect_failure_send_message(self): - connected = bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) addr = self.MSE.droid.bluetoothGetLocalAddress() if bt_test_utils.is_map_mce_device_connected(self.MCE, addr): connected = True @@ -167,8 +171,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='2d79a896-b1c1-4fb7-9924-db8b5c698be5') @BluetoothBaseTest.bt_test_wrap def manual_test_send_message_to_contact(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) contacts = self.MCE.droid.contactsGetContactIds() self.log.info(contacts) selected_contact = self.MCE.droid.contactsDisplayContactPickList() @@ -181,6 +183,4 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest): @test_tracker_info(uuid='8ce9a7dd-3b5e-4aee-a897-30740e2439c3') @BluetoothBaseTest.bt_test_wrap def test_send_message_to_multiple_phones(self): - bt_test_utils.connect_pri_to_sec( - self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])) return self.send_message([self.REMOTE, self.REMOTE]) -- cgit v1.2.3 From 7424c34ea3ade328519eb7f62841fbf68c69bf9a Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Thu, 7 Jun 2018 09:37:50 -0700 Subject: Add skipping of alternative setup package Test: Manual execution Bug: 109869727 Change-Id: Iabd3d44f955bd9561653da3c2db8c81a598e3a26 (cherry picked from commit ee065d09cf15984edf967a024da744d89c230b7c) --- acts/framework/acts/controllers/android_device.py | 50 +++++++++++++++-------- 1 file changed, 32 insertions(+), 18 deletions(-) diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py index 3b9cb0ffa7..5110e74406 100755 --- a/acts/framework/acts/controllers/android_device.py +++ b/acts/framework/acts/controllers/android_device.py @@ -388,6 +388,9 @@ class AndroidDevice: self.adb = adb.AdbProxy(serial, ssh_connection=ssh_connection) self.fastboot = fastboot.FastbootProxy( serial, ssh_connection=ssh_connection) + self.adb_logcat_file_path = os.path.join( + log_path_base, 'AndroidDevice%s' % serial, + "adblog,{},{}.txt".format(self.model, serial)) if not self.is_bootloader: self.root_adb() self._ssh_connection = ssh_connection @@ -642,8 +645,9 @@ class AndroidDevice: except (IndexError, ValueError) as e: # Possible ValueError from string to int cast. # Possible IndexError from split. - self.log.warn('Command \"%s\" returned output line: ' - '\"%s\".\nError: %s', cmd, out, e) + self.log.warn( + 'Command \"%s\" returned output line: ' + '\"%s\".\nError: %s', cmd, out, e) except Exception as e: self.log.warn( 'Device fails to check if %s running with \"%s\"\n' @@ -754,7 +758,7 @@ class AndroidDevice: begin_at = '-T "%s"' % self.last_logcat_timestamp else: begin_at = '-T 1' - # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object. + # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object. cmd = "adb -s {} logcat {} -v year {} >> {}".format( self.serial, begin_at, extra_params, logcat_file_path) self.adb_logcat_process = utils.start_standing_subprocess(cmd) @@ -810,8 +814,9 @@ class AndroidDevice: 'pm list packages | grep -w "package:%s"' % package_name)) except Exception as err: - self.log.error('Could not determine if %s is installed. ' - 'Received error:\n%s', package_name, err) + self.log.error( + 'Could not determine if %s is installed. ' + 'Received error:\n%s', package_name, err) return False def is_sl4a_installed(self): @@ -835,8 +840,9 @@ class AndroidDevice: self.log.info("apk %s is running", package_name) return True except Exception as e: - self.log.warn("Device fails to check is %s running by %s " - "Exception %s", package_name, cmd, e) + self.log.warn( + "Device fails to check is %s running by %s " + "Exception %s", package_name, cmd, e) continue self.log.debug("apk %s is not running", package_name) return False @@ -896,9 +902,9 @@ class AndroidDevice: if new_br: out = self.adb.shell("bugreportz", timeout=BUG_REPORT_TIMEOUT) if not out.startswith("OK"): - raise AndroidDeviceError("Failed to take bugreport on %s: %s" % - (self.serial, out)) - br_out_path = out.split(':')[1].strip() + raise AndroidDeviceError( + "Failed to take bugreport on %s: %s" % (self.serial, out)) + br_out_path = out.split(':')[1].strip().split()[0] self.adb.pull("%s %s" % (br_out_path, full_out_path)) else: self.adb.bugreport( @@ -1301,7 +1307,8 @@ class AndroidDevice: def is_screen_lock_enabled(self): """Check if screen lock is enabled""" - cmd = ("sqlite3 /data/system/locksettings.db .dump"" | grep lockscreen.password_type | grep -v alternate") + cmd = ("sqlite3 /data/system/locksettings.db .dump" + " | grep lockscreen.password_type | grep -v alternate") out = self.adb.shell(cmd, ignore_status=True) if "unable to open" in out: self.root_adb() @@ -1319,7 +1326,7 @@ class AndroidDevice: self.log.info("Device is in CrpytKeeper window") return True if "StatusBar" in current_window and ( - (not current_app) or "FallbackHome" in current_app): + (not current_app) or "FallbackHome" in current_app): self.log.info("Device is locked") return True return False @@ -1370,12 +1377,13 @@ class AndroidDevice: def exit_setup_wizard(self): if not self.is_user_setup_complete() or self.is_setupwizard_on(): - self.adb.shell("pm disable %s" % self.get_setupwizard_package_name()) + self.adb.shell( + "pm disable %s" % self.get_setupwizard_package_name()) # Wait up to 5 seconds for user_setup_complete to be updated - for _ in range(5): + end_time = time.time() + 5 + while time.time() < end_time: if self.is_user_setup_complete() or not self.is_setupwizard_on(): return - time.sleep(1) # If fail to exit setup wizard, set local.prop and reboot if not self.is_user_setup_complete() and self.is_setupwizard_on(): @@ -1386,12 +1394,18 @@ class AndroidDevice: def get_setupwizard_package_name(self): """Finds setupwizard package/.activity + Bypass setupwizard or setupwraith depending on device. + Returns: packageName/.ActivityName - """ - package = self.adb.shell("pm list packages -f | grep setupwizard | grep com.google.android") + """ + packages_to_skip = "'setupwizard|setupwraith'" + android_package_name = "com.google.android" + package = self.adb.shell( + "pm list packages -f | grep -E {} | grep {}".format( + packages_to_skip, android_package_name)) wizard_package = re.split("=", package)[1] - activity = re.search("wizard/(.*?).apk", package, re.IGNORECASE).groups()[0] + activity = re.search("(.*?).apk", package, re.IGNORECASE).groups()[0] self.log.info("%s/.%sActivity" % (wizard_package, activity)) return "%s/.%sActivity" % (wizard_package, activity) -- cgit v1.2.3 From b848873ad7ca75013b4c525312464a7f47a43d78 Mon Sep 17 00:00:00 2001 From: Omar El Ayach Date: Mon, 11 Jun 2018 15:29:21 -0700 Subject: Merge "Minor edits to Throughput stability tests" am: 6714daecbe Test: Done Bug: 65563975 Change-Id: I930b55328f3d1dbd1c52be9d6822c104bfa2976d (cherry picked from commit 9f648f4ce7ec6fc7f74bf1e26594760c6e51ccc6) --- acts/tests/google/wifi/WifiThroughputStabilityTest.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/acts/tests/google/wifi/WifiThroughputStabilityTest.py b/acts/tests/google/wifi/WifiThroughputStabilityTest.py index c28bd63d2d..326e8e807b 100644 --- a/acts/tests/google/wifi/WifiThroughputStabilityTest.py +++ b/acts/tests/google/wifi/WifiThroughputStabilityTest.py @@ -29,6 +29,8 @@ from acts.test_utils.wifi import wifi_retail_ap as retail_ap from acts.test_utils.wifi import wifi_test_utils as wutils TEST_TIMEOUT = 10 +SHORT_SLEEP = 1 +MED_SLEEP = 6 class WifiThroughputStabilityTest(base_test.BaseTestClass): @@ -97,7 +99,7 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): "throughput_stability_test_params", "testbed_params", "main_network" ] - opt_params = ["RetailAccessPoints"] + opt_params = ["RetailAccessPoints", "golden_files_list"] self.unpack_userparams(req_params, opt_params) self.test_params = self.throughput_stability_test_params self.num_atten = self.attenuators[0].instrument.num_atten @@ -229,6 +231,14 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): test_result = {} # Configure AP band = self.access_point.band_lookup_by_channel(channel) + if "2G" in band: + frequency = wutils.WifiEnums.channel_2G_to_freq[channel] + else: + frequency = wutils.WifiEnums.channel_5G_to_freq[channel] + if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES: + self.access_point.set_region(self.testbed_params["DFS_region"]) + else: + self.access_point.set_region(self.testbed_params["default_region"]) self.access_point.set_channel(band, channel) self.access_point.set_bandwidth(band, mode) self.log.info("Access Point Configuration: {}".format( @@ -240,10 +250,11 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass): for i in range(self.num_atten) ] # Connect DUT to Network - self.main_network[band]["channel"] = channel + wutils.wifi_toggle_state(self.dut, True) wutils.reset_wifi(self.dut) + self.main_network[band]["channel"] = channel wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5) - time.sleep(5) + time.sleep(MED_SLEEP) # Run test and log result # Start iperf session self.log.info("Starting iperf test.") -- cgit v1.2.3 From 7e3c1a1ec235dfc9995ae493ef7d4497a5af43ad Mon Sep 17 00:00:00 2001 From: Qi Jiang Date: Wed, 13 Jun 2018 17:32:15 -0700 Subject: Merge "Minor fix in base class to fit more tests" am: 0221bdd340 am: 84a773bee4 am: 592fc9adb1 Test: None Bug: 65563975 Change-Id: I4bf767be346a0e449f9f64e361b26084dede0099 (cherry picked from commit c2b3c607569c77cefe89aa5801a6749698b08b96) --- acts/framework/acts/test_utils/power/PowerBaseTest.py | 2 +- acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py index 0a45dd1954..fa0a0e642b 100644 --- a/acts/framework/acts/test_utils/power/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py @@ -125,7 +125,7 @@ class PowerBaseTest(base_test.BaseTestClass): if hasattr(self, 'attenuators'): self.num_atten = self.attenuators[0].instrument.num_atten self.atten_level = self.unpack_custom_file(self.attenuation_file) - self.set_attenuation(INITIAL_ATTEN) + self.set_attenuation(INITIAL_ATTEN) self.threshold = self.unpack_custom_file(self.threshold_file) self.mon_info = self.create_monsoon_info() diff --git a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py index 635c8b54c8..19702f4540 100644 --- a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py @@ -34,6 +34,7 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest): self.access_point_main = self.access_points[0] if len(self.access_points) > 1: self.access_point_aux = self.access_points[1] + if hasattr(self, 'network_file'): self.networks = self.unpack_custom_file(self.network_file, False) self.main_network = self.networks['main_network'] self.aux_network = self.networks['aux_network'] -- cgit v1.2.3 From 74b3d7b7ef22cd0fd2df1c083032f8e1d45b9776 Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Mon, 11 Jun 2018 11:05:56 -0700 Subject: Fix typo in GattConnectTest.py failure Test: Manual execution Bug: 110037296 Change-Id: I65cb570011203d0d11b10baa225ddb8594e00449 (cherry picked from commit 7df49e11876d239952db63fa7c564564d336213d) --- acts/tests/google/ble/gatt/GattConnectTest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/acts/tests/google/ble/gatt/GattConnectTest.py b/acts/tests/google/ble/gatt/GattConnectTest.py index 0b160a23cf..dd6857c941 100644 --- a/acts/tests/google/ble/gatt/GattConnectTest.py +++ b/acts/tests/google/ble/gatt/GattConnectTest.py @@ -308,7 +308,7 @@ class GattConnectTest(BluetoothBaseTest): self.default_timeout) except Empty: self.log.error( - gatt_cb_err['gatt_conn_change_err'].format(expected_event)) + gatt_cb_err['gatt_conn_changed_err'].format(expected_event)) test_result = False return self._orchestrate_gatt_disconnection(bluetooth_gatt, gatt_callback) -- cgit v1.2.3 From 058866f3e93f3b1ce7a04e634faaaac9c86449c9 Mon Sep 17 00:00:00 2001 From: Daniel Barros Date: Thu, 14 Jun 2018 14:48:56 -0700 Subject: Adding the compatibility for any router for power tests Test: None Bug: b/65563975 am: 4824fc72ee am: 718ddfadf0 am: c779735a96 Change-Id: I2ce1bea1f658206107d26897ff9d0df7afe0c544 (cherry picked from commit 67ec499ef2af509969f31011d23a83c354690893) --- acts/framework/acts/test_utils/power/PowerBaseTest.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py index fa0a0e642b..78c5090599 100644 --- a/acts/framework/acts/test_utils/power/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py @@ -451,11 +451,11 @@ class PowerBaseTest(base_test.BaseTestClass): self.brconfigs: dict for bridge interface configs """ wutils.wifi_toggle_state(self.dut, True) - self.brconfigs = wputils.ap_setup( - self.access_point, network, bandwidth=bandwidth) + if hasattr(self, 'access_points'): + self.brconfigs = wputils.ap_setup( + self.access_point, network, bandwidth=bandwidth) if connect: wutils.wifi_connect(self.dut, network) - return self.brconfigs def process_iperf_results(self): """Get the iperf results and process. -- cgit v1.2.3 From 1222b1b3c0ea36c4069c675437de62e57e91fb04 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Thu, 14 Jun 2018 12:24:59 -0700 Subject: [WifiIOTTest] Re-add some more tests to WifiIOTTest CHERRY PICKED CHANGES FROM AOSP Bug: 109670497 Test: Verified the changes Change-Id: I68ff30ba871d238c61a6d527137bd351f4bc686d Merged-In: I7acf693a4b86ae24d4c87171cfc19c34c1d2b9ef --- acts/tests/google/wifi/WifiIOTTest.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/acts/tests/google/wifi/WifiIOTTest.py b/acts/tests/google/wifi/WifiIOTTest.py index 22e67d9e28..75449517f7 100755 --- a/acts/tests/google/wifi/WifiIOTTest.py +++ b/acts/tests/google/wifi/WifiIOTTest.py @@ -132,12 +132,12 @@ class WifiIOTTest(WifiBaseTest): """Tests""" @test_tracker_info(uuid="a57cc861-b6c2-47e4-9db6-7a3ab32c6e20") - def iot_connection_to_ubiquity_ap1_2g(self): + def test_iot_connection_to_ubiquity_ap1_2g(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="2065c2f7-2b89-4da7-a15d-e5dc17b88d52") - def iot_connection_to_ubiquity_ap1_5g(self): + def test_iot_connection_to_ubiquity_ap1_5g(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -152,12 +152,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="02a8cc75-6781-4153-8d90-bed7568a1e78") - def iot_connection_to_AirportExtreme_2G(self): + def test_iot_connection_to_AirportExtreme_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="83a42c97-1358-4ba7-bdb2-238fdb1c945e") - def iot_connection_to_AirportExtreme_5G(self): + def test_iot_connection_to_AirportExtreme_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -172,12 +172,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="2503d9ed-35df-4be0-b838-590324cecaee") - def test_iot_connection_to_Dlink_AC1200_2G(self): + def iot_connection_to_Dlink_AC1200_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="0a44e148-a4bf-43f4-88eb-e4c1ffa850ce") - def test_iot_connection_to_Dlink_AC1200_5G(self): + def iot_connection_to_Dlink_AC1200_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -242,17 +242,17 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="054d2ffc-97fd-4613-bf47-acedd0fa4701") - def iot_connection_to_NETGEAR_AC3200_2G(self): + def test_iot_connection_to_NETGEAR_AC3200_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="d15a789a-def5-4c6a-b59e-1a75f73cc6a9") - def iot_connection_to_NETGEAR_AC3200_5G_1(self): + def test_iot_connection_to_NETGEAR_AC3200_5G_1(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="1de6369e-97da-479f-b17c-9144bb814f51") - def iot_connection_to_NETGEAR_AC3200_5G_2(self): + def test_iot_connection_to_NETGEAR_AC3200_5G_2(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @@ -282,12 +282,12 @@ class WifiIOTTest(WifiBaseTest): self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="e5517b82-c225-449d-83ac-055a561a764f") - def iot_connection_to_TP_LINK_AC1700_2G(self): + def test_iot_connection_to_TP_LINK_AC1700_2G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) @test_tracker_info(uuid="9531d3cc-129d-4501-a5e3-d7502120cd8b") - def iot_connection_to_TP_LINK_AC1700_5G(self): + def test_iot_connection_to_TP_LINK_AC1700_5G(self): ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) -- cgit v1.2.3 From 20068f329933b0425d0c8cda9ff1d71b28e08be2 Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Tue, 19 Jun 2018 16:48:51 -0700 Subject: [Tests]Add more delay in the softAP test. Bug: 109762575 Test: Local Change-Id: Id61957dad7d0edb7b115fa64bd30ca16cfb9bb74 --- acts/tests/google/wifi/WifiStressTest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py index 78608d7142..bb3396b29b 100755 --- a/acts/tests/google/wifi/WifiStressTest.py +++ b/acts/tests/google/wifi/WifiStressTest.py @@ -294,7 +294,8 @@ class WifiStressTest(WifiBaseTest): wutils.stop_wifi_tethering(self.dut) asserts.assert_false(self.dut.droid.wifiIsApEnabled(), "SoftAp failed to shutdown!") - time.sleep(TIMEOUT) + # Give some time for WiFi to come back to previous state. + time.sleep(2) cur_wifi_state = self.dut.droid.wifiCheckState() if initial_wifi_state != cur_wifi_state: raise signals.TestFailure("Wifi state was %d before softAP and %d now!" % -- cgit v1.2.3 From f13bbb14f0b049d6a5a22fb8077bf179664f4851 Mon Sep 17 00:00:00 2001 From: tturney Date: Mon, 18 Jun 2018 13:34:39 -0700 Subject: Add ability to collect BT metrics on each class Also update blutooth.proto Bug: 110367326 Test: Manual Execution Change-Id: I4f41c42e8c848d869fdb8c86bfc58b63729e6f74 (cherry picked from commit cef10b3c846cbe1d7d8e6c6f17bea6ace854544b) --- .../acts/test_utils/bt/BluetoothBaseTest.py | 68 ++++++++++++++++++++-- acts/framework/acts/test_utils/bt/bluetooth.proto | 15 +++++ 2 files changed, 79 insertions(+), 4 deletions(-) diff --git a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py index 01cb76af58..cd82974be7 100644 --- a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py @@ -24,8 +24,13 @@ import os from acts import utils from acts.base_test import BaseTestClass from acts.signals import TestSignal +from acts.utils import create_dir +from acts.utils import dump_string_to_file from acts.controllers import android_device +from acts.libs.proto.proto_utils import compile_import_proto +from acts.libs.proto.proto_utils import parse_proto_to_ascii +from acts.test_utils.bt.bt_metrics_utils import get_bluetooth_metrics from acts.test_utils.bt.bt_test_utils import reset_bluetooth from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs @@ -48,6 +53,29 @@ class BluetoothBaseTest(BaseTestClass): for ad in self.android_devices: self._setup_bt_libs(ad) + def collect_bluetooth_manager_metrics_logs(self, ads, test_name): + """ + Collect Bluetooth metrics logs, save an ascii log to disk and return + both binary and ascii logs to caller + :param ads: list of active Android devices + :return: List of binary metrics logs, + List of ascii metrics logs + """ + bluetooth_logs = [] + bluetooth_logs_ascii = [] + for ad in ads: + serial = ad.serial + out_name = "{}_{}_{}".format(serial, test_name, + "bluetooth_metrics.txt") + bluetooth_log = get_bluetooth_metrics(ad, + ad.bluetooth_proto_module) + bluetooth_log_ascii = parse_proto_to_ascii(bluetooth_log) + dump_string_to_file(bluetooth_log_ascii, + os.path.join(ad.metrics_path, out_name)) + bluetooth_logs.append(bluetooth_log) + bluetooth_logs_ascii.append(bluetooth_log_ascii) + return bluetooth_logs, bluetooth_logs_ascii + # Use for logging in the test cases to facilitate # faster log lookup and reduce ambiguity in logging. @staticmethod @@ -104,7 +132,42 @@ class BluetoothBaseTest(BaseTestClass): thread.start() for t in threads: t.join() - return setup_multiple_devices_for_bt_test(self.android_devices) + if not setup_multiple_devices_for_bt_test(self.android_devices): + return False + if "bluetooth_proto_path" in self.user_params: + from google import protobuf + + self.bluetooth_proto_path = self.user_params[ + "bluetooth_proto_path"][0] + if not os.path.isfile(self.bluetooth_proto_path): + try: + self.bluetooth_proto_path = "{}/bluetooth.proto".format( + os.path.dirname(os.path.realpath(__file__))) + except Exception: + self.log.error("File not found.") + if not os.path.isfile(self.bluetooth_proto_path): + self.log.error("Unable to find Bluetooth proto {}.".format( + self.bluetooth_proto_path)) + return False + for ad in self.android_devices: + ad.metrics_path = os.path.join(ad.log_path, "BluetoothMetrics") + create_dir(ad.metrics_path) + ad.bluetooth_proto_module = \ + compile_import_proto(ad.metrics_path, self.bluetooth_proto_path) + if not ad.bluetooth_proto_module: + self.log.error("Unable to compile bluetooth proto at " + + self.bluetooth_proto_path) + return False + # Clear metrics. + get_bluetooth_metrics(ad, ad.bluetooth_proto_module) + return True + + def teardown_class(self): + if "bluetooth_proto_path" in self.user_params: + # Collect metrics here bassed off class name + bluetooth_logs, bluetooth_logs_ascii = \ + self.collect_bluetooth_manager_metrics_logs( + self.android_devices, self.__class__.__name__) def setup_test(self): self.timer_list = [] @@ -114,9 +177,6 @@ class BluetoothBaseTest(BaseTestClass): a.droid.wakeUpNow() return True - def teardown_test(self): - return True - def on_fail(self, test_name, begin_time): self.log.debug( "Test {} failed. Gathering bugreport and btsnoop logs".format( diff --git a/acts/framework/acts/test_utils/bt/bluetooth.proto b/acts/framework/acts/test_utils/bt/bluetooth.proto index a43ff47f55..4a07e59d76 100644 --- a/acts/framework/acts/test_utils/bt/bluetooth.proto +++ b/acts/framework/acts/test_utils/bt/bluetooth.proto @@ -114,6 +114,15 @@ message RFCommSession { optional int32 tx_bytes = 2; } +enum A2dpSourceCodec { + A2DP_SOURCE_CODEC_UNKNOWN = 0; + A2DP_SOURCE_CODEC_SBC = 1; + A2DP_SOURCE_CODEC_AAC = 2; + A2DP_SOURCE_CODEC_APTX = 3; + A2DP_SOURCE_CODEC_APTX_HD = 4; + A2DP_SOURCE_CODEC_LDAC = 5; +} + // Session information that gets logged for A2DP session. message A2DPSession { // Media timer in milliseconds. @@ -139,6 +148,12 @@ message A2DPSession { // Total audio time in this A2DP session optional int64 audio_duration_millis = 8; + + // Audio codec used in this A2DP session in A2DP source role + optional A2dpSourceCodec source_codec = 9; + + // Whether A2DP offload is enabled in this A2DP session + optional bool is_a2dp_offload = 10; } message PairEvent { -- cgit v1.2.3 From c59c9e1d6b4804231422fb675e3a01c5c2721442 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Thu, 7 Jun 2018 10:48:14 -0700 Subject: [OTA tests] Add OTA tests for wifi hotspot and core networking features CHERRY PICKED FROM AOSP Bug: 109876567 Test: Verified Change-Id: I95d8122e1696aa0ff80b020f1b20e96b42c55080 Merged-In: Idb1435b9b62dad16356e2e8bb108e38ab6506cd0 --- .../acts/test_utils/net/net_test_utils.py | 211 +++++++++++++++++++++ .../acts/test_utils/wifi/wifi_test_utils.py | 26 +++ acts/tests/google/net/CoreNetworkingOTATest.py | 102 ++++++++++ .../google/wifi/WifiTethering2GOpenOTATest.py | 77 ++++++++ .../tests/google/wifi/WifiTethering2GPskOTATest.py | 77 ++++++++ .../google/wifi/WifiTethering5GOpenOTATest.py | 77 ++++++++ .../tests/google/wifi/WifiTethering5GPskOTATest.py | 77 ++++++++ 7 files changed, 647 insertions(+) create mode 100644 acts/framework/acts/test_utils/net/net_test_utils.py create mode 100755 acts/tests/google/net/CoreNetworkingOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering2GOpenOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering2GPskOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering5GOpenOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering5GPskOTATest.py diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py new file mode 100644 index 0000000000..c7e55d3e9a --- /dev/null +++ b/acts/framework/acts/test_utils/net/net_test_utils.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts.test_utils.net import connectivity_const as cconst +from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection +from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.wifi import wifi_test_utils as wutils + +import os +import re +import time +import urllib.request + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams +VPN_PING_ADDR = "10.10.10.1" + +def verify_lte_data_and_tethering_supported(ad): + """Verify if LTE data is enabled and tethering supported""" + wutils.wifi_toggle_state(ad, False) + ad.droid.telephonyToggleDataConnection(True) + wait_for_cell_data_connection(ad.log, ad, True) + asserts.assert_true( + verify_http_connection(ad.log, ad), + "HTTP verification failed on cell data connection") + asserts.assert_true( + ad.droid.connectivityIsTetheringSupported(), + "Tethering is not supported for the provider") + wutils.wifi_toggle_state(ad, True) + +def set_chrome_browser_permissions(ad): + """Set chrome browser start with no-first-run verification. + Give permission to read from and write to storage + """ + commands = ["pm grant com.android.chrome " + "android.permission.READ_EXTERNAL_STORAGE", + "pm grant com.android.chrome " + "android.permission.WRITE_EXTERNAL_STORAGE", + "rm /data/local/chrome-command-line", + "am set-debug-app --persistent com.android.chrome", + 'echo "chrome --no-default-browser-check --no-first-run ' + '--disable-fre" > /data/local/tmp/chrome-command-line'] + for cmd in commands: + try: + ad.adb.shell(cmd) + except adb.AdbError: + self.log.warn("adb command %s failed on %s" % (cmd, ad.serial)) + +def verify_ping_to_vpn_ip(ad): + """ Verify if IP behind VPN server is pingable. + Ping should pass, if VPN is connected. + Ping should fail, if VPN is disconnected. + + Args: + ad: android device object + """ + ping_result = None + pkt_loss = "100% packet loss" + try: + ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % VPN_PING_ADDR) + except adb.AdbError: + pass + return ping_result and pkt_loss not in ping_result + +def legacy_vpn_connection_test_logic(ad, vpn_profile): + """ Test logic for each legacy VPN connection + + Steps: + 1. Generate profile for the VPN type + 2. Establish connection to the server + 3. Verify that connection is established using LegacyVpnInfo + 4. Verify the connection by pinging the IP behind VPN + 5. Stop the VPN connection + 6. Check the connection status + 7. Verify that ping to IP behind VPN fails + + Args: + 1. ad: Android device object + 2. VpnProfileType (1 of the 6 types supported by Android) + """ + # Wait for sometime so that VPN server flushes all interfaces and + # connections after graceful termination + time.sleep(10) + + ad.adb.shell("ip xfrm state flush") + ad.log.info("Connecting to: %s", vpn_profile) + ad.droid.vpnStartLegacyVpn(vpn_profile) + time.sleep(cconst.VPN_TIMEOUT) + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_equal(connected_vpn_info["state"], + cconst.VPN_STATE_CONNECTED, + "Unable to establish VPN connection for %s" + % vpn_profile) + + ping_result = verify_ping_to_vpn_ip(ad) + ip_xfrm_state = ad.adb.shell("ip xfrm state") + match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) + if match_obj: + ip_xfrm_state = format(match_obj.group(0)).split() + ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) + + ad.droid.vpnStopLegacyVpn() + asserts.assert_true(ping_result, + "Ping to the internal IP failed. " + "Expected to pass as VPN is connected") + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_true(not connected_vpn_info, + "Unable to terminate VPN connection for %s" + % vpn_profile) + +def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, + ipsec_server_type, log_path): + """ Download the certificates from VPN server and push to sdcard of DUT + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Client cert file name on DUT's sdcard + """ + url = "http://%s%s%s" % (vpn_server_addr, + vpn_params['cert_path_vpnserver'], + vpn_params['client_pkcs_file_name']) + local_cert_name = "%s_%s_%s" % (vpn_type.name, + ipsec_server_type, + vpn_params['client_pkcs_file_name']) + ad.adb.push("%s sdcard/" % local_cert_name) + return local_cert_name + + local_file_path = os.path.join(log_path, local_cert_name) + try: + ret = urllib.request.urlopen(url) + with open(local_file_path, "wb") as f: + f.write(ret.read()) + except: + asserts.fail("Unable to download certificate from the server") + + f.close() + ad.adb.push("%s sdcard/" % local_file_path) + return local_cert_name + +def generate_legacy_vpn_profile(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path): + """ Generate legacy VPN profile for a VPN + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Vpn profile + """ + vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], + VPN_CONST.PWD: vpn_params['vpn_password'], + VPN_CONST.TYPE: vpn_type.value, + VPN_CONST.SERVER: vpn_server_addr,} + vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, + ipsec_server_type) + if vpn_type.name == "PPTP": + vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name + + psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) + rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) + + if vpn_type.name in psk_set: + vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] + elif vpn_type.name in rsa_set: + cert_name = download_load_certs(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path) + vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] + vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] + ad.droid.installCertificate(vpn_profile, cert_name, + vpn_params['cert_password']) + else: + vpn_profile[VPN_CONST.MPPE] = "mppe" + + return vpn_profile diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py index 47aa6f2cb5..0c96fed835 100755 --- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py @@ -907,6 +907,32 @@ def start_wifi_tethering(ad, ssid, password, band=None, hidden=None): finally: ad.droid.wifiStopTrackingTetherStateChange() +def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None): + """ Save a soft ap configuration """ + if band: + wifi_config[WifiEnums.APBAND_KEY] = band + if hidden: + wifi_config[WifiEnums.HIDDEN_KEY] = hidden + asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config), + "Failed to set WifiAp Configuration") + + wifi_ap = ad.droid.wifiGetApConfiguration() + asserts.assert_true( + wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY], + "Hotspot SSID doesn't match with expected SSID") + +def start_wifi_tethering_saved_config(ad): + """ Turn on wifi hotspot with a config that is already saved """ + ad.droid.wifiStartTrackingTetherStateChange() + ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False) + try: + ad.ed.pop_event("ConnectivityManagerOnTetheringStarted") + ad.ed.wait_for_event("TetherStateChanged", + lambda x: x["data"]["ACTIVE_TETHER"], 30) + except: + asserts.fail("Didn't receive wifi tethering starting confirmation") + finally: + ad.droid.wifiStopTrackingTetherStateChange() def stop_wifi_tethering(ad): """Stops wifi tethering on an android_device. diff --git a/acts/tests/google/net/CoreNetworkingOTATest.py b/acts/tests/google/net/CoreNetworkingOTATest.py new file mode 100755 index 0000000000..2444971a87 --- /dev/null +++ b/acts/tests/google/net/CoreNetworkingOTATest.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import time +import urllib.request + +import acts.base_test +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.net import connectivity_const as cconst + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams + + +class CoreNetworkingOTATest(BaseTestClass): + """Core networking auto update tests""" + + def setup_class(self): + super(CoreNetworkingOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.dut = self.android_devices[0] + req_params = dir(VPN_PARAMS) + req_params = [x for x in req_params if not x.startswith('__')] + req_params.extend(["download_file", "file_size"]) + self.unpack_userparams(req_params) + + vpn_params = {'vpn_username': self.vpn_username, + 'vpn_password': self.vpn_password, + 'psk_secret': self.psk_secret, + 'client_pkcs_file_name': self.client_pkcs_file_name, + 'cert_path_vpnserver': self.cert_path_vpnserver, + 'cert_password': self.cert_password} + + # generate legacy vpn profiles + wutils.wifi_connect(self.dut, self.wifi_network) + self.xauth_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_XAUTH_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_XAUTH_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.l2tp_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.L2TP_IPSEC_RSA, + self.vpn_server_addresses[VPN_TYPE.L2TP_IPSEC_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.hybrid_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_HYBRID_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_HYBRID_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.vpn_profiles = [self.l2tp_rsa, self.hybrid_rsa, self.xauth_rsa] + + # verify legacy vpn + for prof in self.vpn_profiles: + nutils.legacy_vpn_connection_test_logic(self.dut, prof) + + # Run OTA below, if ota fails then abort all tests. + try: + for ad in self.android_devices: + ota_updater.update(ad) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + """Tests""" + + def test_legacy_vpn_ota_xauth_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.xauth_rsa) + + def test_legacy_vpn_ota_l2tp_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.l2tp_rsa) + + def test_legacy_vpn_ota_hybrid_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.hybrid_rsa) diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py new file mode 100755 index 0000000000..68f039980e --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GOpenOTATest(BaseTestClass): + """Wifi Tethering 2G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering2GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_2g_open(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py new file mode 100755 index 0000000000..e1904e7c89 --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GPskOTATest(BaseTestClass): + """Wifi Tethering 2G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering2GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_2g_psk(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py new file mode 100755 index 0000000000..6084500df0 --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GOpenOTATest(BaseTestClass): + """Wifi Tethering 5G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering5GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_5g_open(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py new file mode 100755 index 0000000000..5cb532ac6c --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GPskOTATest(BaseTestClass): + """Wifi Tethering 5G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering5GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_5g_psk(self): + self._verify_wifi_tethering() -- cgit v1.2.3 From 5abcaff919f821bf4b452b4edb5cb0605dc8a1a0 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Thu, 7 Jun 2018 10:48:14 -0700 Subject: [OTA tests] Add OTA tests for wifi hotspot and core networking features CHERRY PICKED FROM AOSP Bug: 109876567 Test: Verified Change-Id: I6618ea8a734d64feb1128784f70e0a68394dacec Merged-In: Idb1435b9b62dad16356e2e8bb108e38ab6506cd0 --- .../acts/test_utils/net/net_test_utils.py | 211 +++++++++++++++++++++ .../acts/test_utils/wifi/wifi_test_utils.py | 26 +++ acts/tests/google/net/CoreNetworkingOTATest.py | 102 ++++++++++ .../google/wifi/WifiTethering2GOpenOTATest.py | 77 ++++++++ .../tests/google/wifi/WifiTethering2GPskOTATest.py | 77 ++++++++ .../google/wifi/WifiTethering5GOpenOTATest.py | 77 ++++++++ .../tests/google/wifi/WifiTethering5GPskOTATest.py | 77 ++++++++ 7 files changed, 647 insertions(+) create mode 100644 acts/framework/acts/test_utils/net/net_test_utils.py create mode 100755 acts/tests/google/net/CoreNetworkingOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering2GOpenOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering2GPskOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering5GOpenOTATest.py create mode 100755 acts/tests/google/wifi/WifiTethering5GPskOTATest.py diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py new file mode 100644 index 0000000000..c7e55d3e9a --- /dev/null +++ b/acts/framework/acts/test_utils/net/net_test_utils.py @@ -0,0 +1,211 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 Google, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts.test_utils.net import connectivity_const as cconst +from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection +from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.wifi import wifi_test_utils as wutils + +import os +import re +import time +import urllib.request + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams +VPN_PING_ADDR = "10.10.10.1" + +def verify_lte_data_and_tethering_supported(ad): + """Verify if LTE data is enabled and tethering supported""" + wutils.wifi_toggle_state(ad, False) + ad.droid.telephonyToggleDataConnection(True) + wait_for_cell_data_connection(ad.log, ad, True) + asserts.assert_true( + verify_http_connection(ad.log, ad), + "HTTP verification failed on cell data connection") + asserts.assert_true( + ad.droid.connectivityIsTetheringSupported(), + "Tethering is not supported for the provider") + wutils.wifi_toggle_state(ad, True) + +def set_chrome_browser_permissions(ad): + """Set chrome browser start with no-first-run verification. + Give permission to read from and write to storage + """ + commands = ["pm grant com.android.chrome " + "android.permission.READ_EXTERNAL_STORAGE", + "pm grant com.android.chrome " + "android.permission.WRITE_EXTERNAL_STORAGE", + "rm /data/local/chrome-command-line", + "am set-debug-app --persistent com.android.chrome", + 'echo "chrome --no-default-browser-check --no-first-run ' + '--disable-fre" > /data/local/tmp/chrome-command-line'] + for cmd in commands: + try: + ad.adb.shell(cmd) + except adb.AdbError: + self.log.warn("adb command %s failed on %s" % (cmd, ad.serial)) + +def verify_ping_to_vpn_ip(ad): + """ Verify if IP behind VPN server is pingable. + Ping should pass, if VPN is connected. + Ping should fail, if VPN is disconnected. + + Args: + ad: android device object + """ + ping_result = None + pkt_loss = "100% packet loss" + try: + ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % VPN_PING_ADDR) + except adb.AdbError: + pass + return ping_result and pkt_loss not in ping_result + +def legacy_vpn_connection_test_logic(ad, vpn_profile): + """ Test logic for each legacy VPN connection + + Steps: + 1. Generate profile for the VPN type + 2. Establish connection to the server + 3. Verify that connection is established using LegacyVpnInfo + 4. Verify the connection by pinging the IP behind VPN + 5. Stop the VPN connection + 6. Check the connection status + 7. Verify that ping to IP behind VPN fails + + Args: + 1. ad: Android device object + 2. VpnProfileType (1 of the 6 types supported by Android) + """ + # Wait for sometime so that VPN server flushes all interfaces and + # connections after graceful termination + time.sleep(10) + + ad.adb.shell("ip xfrm state flush") + ad.log.info("Connecting to: %s", vpn_profile) + ad.droid.vpnStartLegacyVpn(vpn_profile) + time.sleep(cconst.VPN_TIMEOUT) + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_equal(connected_vpn_info["state"], + cconst.VPN_STATE_CONNECTED, + "Unable to establish VPN connection for %s" + % vpn_profile) + + ping_result = verify_ping_to_vpn_ip(ad) + ip_xfrm_state = ad.adb.shell("ip xfrm state") + match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) + if match_obj: + ip_xfrm_state = format(match_obj.group(0)).split() + ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) + + ad.droid.vpnStopLegacyVpn() + asserts.assert_true(ping_result, + "Ping to the internal IP failed. " + "Expected to pass as VPN is connected") + + connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo() + asserts.assert_true(not connected_vpn_info, + "Unable to terminate VPN connection for %s" + % vpn_profile) + +def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, + ipsec_server_type, log_path): + """ Download the certificates from VPN server and push to sdcard of DUT + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Client cert file name on DUT's sdcard + """ + url = "http://%s%s%s" % (vpn_server_addr, + vpn_params['cert_path_vpnserver'], + vpn_params['client_pkcs_file_name']) + local_cert_name = "%s_%s_%s" % (vpn_type.name, + ipsec_server_type, + vpn_params['client_pkcs_file_name']) + ad.adb.push("%s sdcard/" % local_cert_name) + return local_cert_name + + local_file_path = os.path.join(log_path, local_cert_name) + try: + ret = urllib.request.urlopen(url) + with open(local_file_path, "wb") as f: + f.write(ret.read()) + except: + asserts.fail("Unable to download certificate from the server") + + f.close() + ad.adb.push("%s sdcard/" % local_file_path) + return local_cert_name + +def generate_legacy_vpn_profile(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path): + """ Generate legacy VPN profile for a VPN + + Args: + ad: android device object + vpn_params: vpn params from config file + vpn_type: 1 of the 6 VPN types + vpn_server_addr: server addr to connect to + ipsec_server_type: ipsec version - strongswan or openswan + log_path: log path to download cert + + Returns: + Vpn profile + """ + vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'], + VPN_CONST.PWD: vpn_params['vpn_password'], + VPN_CONST.TYPE: vpn_type.value, + VPN_CONST.SERVER: vpn_server_addr,} + vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name, + ipsec_server_type) + if vpn_type.name == "PPTP": + vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name + + psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) + rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) + + if vpn_type.name in psk_set: + vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret'] + elif vpn_type.name in rsa_set: + cert_name = download_load_certs(ad, + vpn_params, + vpn_type, + vpn_server_addr, + ipsec_server_type, + log_path) + vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] + vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] + ad.droid.installCertificate(vpn_profile, cert_name, + vpn_params['cert_password']) + else: + vpn_profile[VPN_CONST.MPPE] = "mppe" + + return vpn_profile diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py index e5e9e5a521..fca5085f0e 100755 --- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py @@ -745,6 +745,32 @@ def start_wifi_tethering(ad, ssid, password, band=None): finally: ad.droid.wifiStopTrackingTetherStateChange() +def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None): + """ Save a soft ap configuration """ + if band: + wifi_config[WifiEnums.APBAND_KEY] = band + if hidden: + wifi_config[WifiEnums.HIDDEN_KEY] = hidden + asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config), + "Failed to set WifiAp Configuration") + + wifi_ap = ad.droid.wifiGetApConfiguration() + asserts.assert_true( + wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY], + "Hotspot SSID doesn't match with expected SSID") + +def start_wifi_tethering_saved_config(ad): + """ Turn on wifi hotspot with a config that is already saved """ + ad.droid.wifiStartTrackingTetherStateChange() + ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False) + try: + ad.ed.pop_event("ConnectivityManagerOnTetheringStarted") + ad.ed.wait_for_event("TetherStateChanged", + lambda x: x["data"]["ACTIVE_TETHER"], 30) + except: + asserts.fail("Didn't receive wifi tethering starting confirmation") + finally: + ad.droid.wifiStopTrackingTetherStateChange() def stop_wifi_tethering(ad): """Stops wifi tethering on an android_device. diff --git a/acts/tests/google/net/CoreNetworkingOTATest.py b/acts/tests/google/net/CoreNetworkingOTATest.py new file mode 100755 index 0000000000..2444971a87 --- /dev/null +++ b/acts/tests/google/net/CoreNetworkingOTATest.py @@ -0,0 +1,102 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import re +import time +import urllib.request + +import acts.base_test +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.net import connectivity_const as cconst + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + +VPN_CONST = cconst.VpnProfile +VPN_TYPE = cconst.VpnProfileType +VPN_PARAMS = cconst.VpnReqParams + + +class CoreNetworkingOTATest(BaseTestClass): + """Core networking auto update tests""" + + def setup_class(self): + super(CoreNetworkingOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.dut = self.android_devices[0] + req_params = dir(VPN_PARAMS) + req_params = [x for x in req_params if not x.startswith('__')] + req_params.extend(["download_file", "file_size"]) + self.unpack_userparams(req_params) + + vpn_params = {'vpn_username': self.vpn_username, + 'vpn_password': self.vpn_password, + 'psk_secret': self.psk_secret, + 'client_pkcs_file_name': self.client_pkcs_file_name, + 'cert_path_vpnserver': self.cert_path_vpnserver, + 'cert_password': self.cert_password} + + # generate legacy vpn profiles + wutils.wifi_connect(self.dut, self.wifi_network) + self.xauth_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_XAUTH_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_XAUTH_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.l2tp_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.L2TP_IPSEC_RSA, + self.vpn_server_addresses[VPN_TYPE.L2TP_IPSEC_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.hybrid_rsa = nutils.generate_legacy_vpn_profile( + self.dut, vpn_params, + VPN_TYPE.IPSEC_HYBRID_RSA, + self.vpn_server_addresses[VPN_TYPE.IPSEC_HYBRID_RSA.name][0], + self.ipsec_server_type[0], self.log_path) + self.vpn_profiles = [self.l2tp_rsa, self.hybrid_rsa, self.xauth_rsa] + + # verify legacy vpn + for prof in self.vpn_profiles: + nutils.legacy_vpn_connection_test_logic(self.dut, prof) + + # Run OTA below, if ota fails then abort all tests. + try: + for ad in self.android_devices: + ota_updater.update(ad) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + """Tests""" + + def test_legacy_vpn_ota_xauth_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.xauth_rsa) + + def test_legacy_vpn_ota_l2tp_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.l2tp_rsa) + + def test_legacy_vpn_ota_hybrid_rsa(self): + nutils.legacy_vpn_connection_test_logic(self.dut, self.hybrid_rsa) diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py new file mode 100755 index 0000000000..68f039980e --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GOpenOTATest(BaseTestClass): + """Wifi Tethering 2G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering2GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_2g_open(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py new file mode 100755 index 0000000000..e1904e7c89 --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering2GPskOTATest(BaseTestClass): + """Wifi Tethering 2G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering2GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_2G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_2g_psk(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py new file mode 100755 index 0000000000..6084500df0 --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GOpenOTATest(BaseTestClass): + """Wifi Tethering 5G Open OTA tests""" + + def setup_class(self): + + super(WifiTethering5GOpenOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_open", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_open, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_5g_open(self): + self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py new file mode 100755 index 0000000000..5cb532ac6c --- /dev/null +++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import acts.signals as signals + +from acts import asserts +from acts.base_test import BaseTestClass +from acts.libs.ota import ota_updater +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G + +import acts.test_utils.net.net_test_utils as nutils +import acts.test_utils.wifi.wifi_test_utils as wutils + + +class WifiTethering5GPskOTATest(BaseTestClass): + """Wifi Tethering 5G Psk OTA tests""" + + def setup_class(self): + + super(WifiTethering5GPskOTATest, self).setup_class() + ota_updater.initialize(self.user_params, self.android_devices) + + self.hotspot_device = self.android_devices[0] + self.tethered_device = self.android_devices[1] + req_params = ("wifi_hotspot_psk", ) + self.unpack_userparams(req_params) + + # verify hotspot device has lte data and supports tethering + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) + + # Save a wifi soft ap configuration and verify that it works + wutils.save_wifi_soft_ap_config(self.hotspot_device, + self.wifi_hotspot_psk, + WIFI_CONFIG_APBAND_5G) + self._verify_wifi_tethering() + + # Run OTA below, if ota fails then abort all tests. + try: + ota_updater.update(self.hotspot_device) + except Exception as err: + raise signals.TestSkipClass( + "Failed up apply OTA update. Aborting tests") + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) + + def teardown_class(self): + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Helper Functions""" + + def _verify_wifi_tethering(self): + """Verify wifi tethering""" + wutils.start_wifi_tethering_saved_config(self.hotspot_device) + wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk) + # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061 + wutils.wifi_toggle_state(self.hotspot_device, True) + + """Tests""" + + def test_wifi_tethering_ota_5g_psk(self): + self._verify_wifi_tethering() -- cgit v1.2.3 From 54f84a52f71a465ec0bf28318b59c592d3bd65e4 Mon Sep 17 00:00:00 2001 From: Roshan Pius Date: Fri, 22 Jun 2018 11:51:08 -0700 Subject: WifiCrashTest: Correctly ensure framework is up after crash The "sys.boot_completed" flag needs to be manually reset before mnaually restarting the framework to correctly detect when the framework is fully up after the crash. Bug: 80259480 Test: `act.py -c wifi_manager.config -tb dut-name -tc WifiCrashTest:test_wifi_framework_crash_reconnec` Change-Id: I2cd94d7e69bbbaaa89cf37a5b805e6e3cb88182c --- acts/framework/acts/controllers/android_device.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py index 5110e74406..03097f049e 100755 --- a/acts/framework/acts/controllers/android_device.py +++ b/acts/framework/acts/controllers/android_device.py @@ -1148,6 +1148,9 @@ class AndroidDevice: self.stop_services() self.log.info("Restarting android runtime") self.adb.shell("stop") + # Reset the boot completed flag before we restart the framework + # to correctly detect when the framework has fully come up. + self.adb.shell("setprop sys.boot_completed 0") self.adb.shell("start") self.wait_for_boot_completion() self.root_adb() -- cgit v1.2.3 From a8bad9691c938a1176fd4d253ca7bd752cab8d79 Mon Sep 17 00:00:00 2001 From: Roshan Pius Date: Mon, 25 Jun 2018 13:50:09 -0700 Subject: WifiStaApConcurrency: Fix tests For a few tests, the test sequence did not correctly reflect the name of the test. Bug: 110759381 Test: `act.py -c softap_cross_onhub.config -tb dut-name -tc WifiStaApConcurrencyTest` Change-Id: Iace3ebb0db6a5a7fb2daa2b8141ea15bd99cc28b --- acts/tests/google/wifi/WifiStaApConcurrencyTest.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py index 6f0beba430..ffe0effb96 100755 --- a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py +++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py @@ -346,9 +346,9 @@ class WifiStaApConcurrencyTest(WifiBaseTest): def test_softap_5G_wifi_connection_2G(self): """Tests bringing up SoftAp on 5G followed by connection to 2G network. """ - self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G(self): @@ -356,7 +356,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) @test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c") def test_softap_2G_wifi_connection_5G_DFS(self): @@ -364,14 +364,14 @@ class WifiStaApConcurrencyTest(WifiBaseTest): """ self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS) self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_5G) + self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) @test_tracker_info(uuid="aa23a3fc-31a1-4d5c-8cf5-2eb9fdf9e7ce") def test_softap_5G_wifi_connection_2G_with_location_scan_on(self): """Tests bringing up SoftAp on 5G followed by connection to 2G network with location scans turned on. """ - self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G) + self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G) self.turn_location_on_and_scan_toggle_on() self.start_softap_and_connect_to_wifi_network( - self.wpapsk_5g, WIFI_CONFIG_APBAND_2G) + self.wpapsk_2g, WIFI_CONFIG_APBAND_5G) -- cgit v1.2.3 From 6beff4f2f5660c1ce55d49e2cbdd1cff31925735 Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Thu, 21 Jun 2018 13:00:42 -0700 Subject: Create a smarter device selector for BT tests Work towards selecting Android devices to test against via using their set feature properties as well as the supported BT UUIDS on the device. Also add the ability to selected prefered device order for tests that care about order and do not know about device capabilties. Bug: 110536180 Test: Manual execution Change-Id: Ia72403893720bbdd1e8699f04e2ce0439bb94b90 (cherry picked from commit 9bce15d9087a80c2bf565717e9b61c1a2fcd02e3) --- .../acts/test_utils/bt/BluetoothBaseTest.py | 13 +++ acts/framework/acts/test_utils/bt/bt_test_utils.py | 97 ++++++++++++++++++++-- 2 files changed, 105 insertions(+), 5 deletions(-) diff --git a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py index cd82974be7..21d63c7810 100644 --- a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py +++ b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py @@ -31,6 +31,7 @@ from acts.controllers import android_device from acts.libs.proto.proto_utils import compile_import_proto from acts.libs.proto.proto_utils import parse_proto_to_ascii from acts.test_utils.bt.bt_metrics_utils import get_bluetooth_metrics +from acts.test_utils.bt.bt_test_utils import get_device_selector_dictionary from acts.test_utils.bt.bt_test_utils import reset_bluetooth from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs @@ -52,6 +53,13 @@ class BluetoothBaseTest(BaseTestClass): BaseTestClass.__init__(self, controllers) for ad in self.android_devices: self._setup_bt_libs(ad) + if 'preferred_device_order' in self.user_params: + prefered_device_order = self.user_params['preferred_device_order'] + for i, ad in enumerate(self.android_devices): + if ad.serial in prefered_device_order: + index = prefered_device_order.index(ad.serial) + self.android_devices[i], self.android_devices[index] = \ + self.android_devices[index], self.android_devices[i] def collect_bluetooth_manager_metrics_logs(self, ads, test_name): """ @@ -123,6 +131,8 @@ class BluetoothBaseTest(BaseTestClass): return _safe_wrap_test_case def setup_class(self): + self.device_selector = get_device_selector_dictionary( + self.android_devices) if "reboot_between_test_class" in self.user_params: threads = [] for a in self.android_devices: @@ -235,3 +245,6 @@ class BluetoothBaseTest(BaseTestClass): # Shell command library setattr(android_device, "shell", ShellCommands(log=self.log, dut=android_device)) + # Setup Android Device feature list + setattr(android_device, "features", + android_device.adb.shell("pm list features").split("\n")) diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py index 9a930ab898..521b144adb 100644 --- a/acts/framework/acts/test_utils/bt/bt_test_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py @@ -223,7 +223,8 @@ def setup_multiple_devices_for_bt_test(android_devices): threads = [] try: for a in android_devices: - thread = threading.Thread(target=factory_reset_bluetooth, args=([[a]])) + thread = threading.Thread( + target=factory_reset_bluetooth, args=([[a]])) threads.append(thread) thread.start() for t in threads: @@ -282,7 +283,11 @@ def bluetooth_enabled_check(ad): return False return True -def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5): + +def wait_for_bluetooth_manager_state(droid, + state=None, + timeout=10, + threshold=5): """ Waits for BlueTooth normalized state or normalized explicit state args: droid: droid device object @@ -301,7 +306,8 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5) # for any normalized state if state is None: if len(set(all_states[-threshold:])) == 1: - log.info("State normalized {}".format(set(all_states[-threshold:]))) + log.info("State normalized {}".format( + set(all_states[-threshold:]))) return True else: # explicit check against normalized state @@ -310,9 +316,11 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5) time.sleep(0.5) log.error( "Bluetooth state fails to normalize" if state is None else - "Failed to match bluetooth state, current state {} expected state {}".format(get_state(), state)) + "Failed to match bluetooth state, current state {} expected state {}". + format(get_state(), state)) return False + def factory_reset_bluetooth(android_devices): """Clears Bluetooth stack of input Android device list. @@ -340,6 +348,7 @@ def factory_reset_bluetooth(android_devices): return False return True + def reset_bluetooth(android_devices): """Resets Bluetooth state of input Android device list. @@ -576,6 +585,7 @@ def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): mac_address = event['data']['Result']['deviceInfo']['address'] return mac_address, advertise_callback, scan_callback + def enable_bluetooth(droid, ed): if droid.bluetoothCheckState() is True: return True @@ -594,6 +604,7 @@ def enable_bluetooth(droid, ed): return True + def disable_bluetooth(droid): """Disable Bluetooth on input Droid object. @@ -1000,7 +1011,8 @@ def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list): bluetooth_profile_connection_state_changed, bt_default_timeout) pri_ad.log.info("Got event {}".format(profile_event)) except Exception as e: - pri_ad.log.error("Did not disconnect from Profiles. Reason {}".format(e)) + pri_ad.log.error( + "Did not disconnect from Profiles. Reason {}".format(e)) return False profile = profile_event['data']['profile'] @@ -1387,3 +1399,78 @@ def is_a2dp_connected(sink, source): if (device["address"] == source.droid.bluetoothGetLocalAddress()): return True return False + + +def get_device_selector_dictionary(android_device_list): + """Create a dictionary of Bluetooth features vs Android devices. + + Args: + android_device_list: The list of Android devices. + Returns: + A dictionary of profiles/features to Android devices. + """ + selector_dict = {} + for ad in android_device_list: + uuids = ad.droid.bluetoothGetLocalUuids() + + for profile, uuid_const in sig_uuid_constants.items(): + uuid_check = sig_uuid_constants['BASE_UUID'].format( + uuid_const).lower() + if uuid_check in uuids: + if profile in selector_dict: + selector_dict[profile].append(ad) + else: + selector_dict[profile] = [ad] + + # Various services may not be active during BT startup. + # If the device can be identified through adb shell pm list features + # then try to add them to the appropriate profiles / features. + + # Android TV. + if "feature:com.google.android.tv.installed" in ad.features: + ad.log.info("Android TV device found.") + supported_profiles = ['AudioSink'] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + + # Android Auto + elif "feature:android.hardware.type.automotive" in ad.features: + ad.log.info("Android Auto device found.") + # Add: AudioSink , A/V_RemoteControl, + supported_profiles = [ + 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server' + ] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + # Android Wear + elif "feature:android.hardware.type.watch" in ad.features: + ad.log.info("Android Wear device found.") + supported_profiles = [] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + # Android Phone + elif "feature:android.hardware.telephony" in ad.features: + ad.log.info("Android Phone device found.") + # Add: AudioSink + supported_profiles = [ + 'AudioSource', 'A/V_RemoteControlTarget', + 'Message Access Server' + ] + _add_android_device_to_dictionary(ad, supported_profiles, + selector_dict) + return selector_dict + + +def _add_android_device_to_dictionary(android_device, profile_list, + selector_dict): + """Adds the AndroidDevice and supported features to the selector dictionary + + Args: + android_device: The Android device. + profile_list: The list of profiles the Android device supports. + """ + for profile in profile_list: + if profile in selector_dict and android_device not in selector_dict[profile]: + selector_dict[profile].append(android_device) + else: + selector_dict[profile] = [android_device] -- cgit v1.2.3 From 8680294ffaaf8ad61e5c707337ec0d9c7abb6d56 Mon Sep 17 00:00:00 2001 From: tturney Date: Mon, 30 Apr 2018 08:32:44 -0700 Subject: Add Service Discovery uuids to bt_constants Test: Manual Bug: 78879049 Change-Id: Ic18105ad7e36e7bedf4aa9c2e533b79a38123daa (cherry picked from commit d09e45d5774e88bf958297c79a1066d7b47ee004) --- acts/framework/acts/test_utils/bt/bt_constants.py | 98 +++++++++++++++++++++++ 1 file changed, 98 insertions(+) diff --git a/acts/framework/acts/test_utils/bt/bt_constants.py b/acts/framework/acts/test_utils/bt/bt_constants.py index f093a1f68c..5be4c79a20 100644 --- a/acts/framework/acts/test_utils/bt/bt_constants.py +++ b/acts/framework/acts/test_utils/bt/bt_constants.py @@ -596,3 +596,101 @@ logcat_strings = { } ### End logcat strings dict""" + +### Begin Service Discovery UUIDS ### +### Values match the Bluetooth SIG defined values: """ +""" https://www.bluetooth.com/specifications/assigned-numbers/service-discovery """ +sig_uuid_constants = { + "BASE_UUID": "0000{}-0000-1000-8000-00805F9B34FB", + "SDP": "0001", + "UDP": "0002", + "RFCOMM": "0003", + "TCP": "0004", + "TCS-BIN": "0005", + "TCS-AT": "0006", + "ATT": "0007", + "OBEX": "0008", + "IP": "0009", + "FTP": "000A", + "HTTP": "000C", + "WSP": "000E", + "BNEP": "000F", + "UPNP": "0010", + "HIDP": "0011", + "HardcopyControlChannel": "0012", + "HardcopyDataChannel": "0014", + "HardcopyNotification": "0016", + "AVCTP": "0017", + "AVDTP": "0019", + "CMTP": "001B", + "MCAPControlChannel": "001E", + "MCAPDataChannel": "001F", + "L2CAP": "0100", + "ServiceDiscoveryServerServiceClassID": "1000", + "BrowseGroupDescriptorServiceClassID": "1001", + "SerialPort": "1101", + "LANAccessUsingPPP": "1102", + "DialupNetworking": "1103", + "IrMCSync": "1104", + "OBEXObjectPush": "1105", + "OBEXFileTransfer": "1106", + "IrMCSyncCommand": "1107", + "Headset": "1108", + "CordlessTelephony": "1109", + "AudioSource": "110A", + "AudioSink": "110B", + "A/V_RemoteControlTarget": "110C", + "AdvancedAudioDistribution": "110D", + "A/V_RemoteControl": "110E", + "A/V_RemoteControlController": "110F", + "Intercom": "1110", + "Fax": "1111", + "Headset - Audio Gateway (AG)": "1112", + "WAP": "1113", + "WAP_CLIENT": "1114", + "PANU": "1115", + "NAP": "1116", + "GN": "1117", + "DirectPrinting": "1118", + "ReferencePrinting": "1119", + "ImagingResponder": "111B", + "ImagingAutomaticArchive": "111C", + "ImagingReferencedObjects": "111D", + "Handsfree": "111E", + "HandsfreeAudioGateway": "111F", + "DirectPrintingReferenceObjectsService": "1120", + "ReflectedUI": "1121", + "BasicPrinting": "1122", + "PrintingStatus": "1123", + "HumanInterfaceDeviceService": "1124", + "HardcopyCableReplacement": "1125", + "HCR_Print": "1126", + "HCR_Scan": "1127", + "Common_ISDN_Access": "1128", + "SIM_Access": "112D", + "Phonebook Access - PCE": "112E", + "Phonebook Access - PSE": "112F", + "Phonebook Access": "1130", + "Headset - HS": "1131", + "Message Access Server": "1132", + "Message Notification Server": "1133", + "Message Access Profile": "1134", + "GNSS": "1135", + "GNSS_Server": "1136", + "PnPInformation": "1200", + "GenericNetworking": "1201", + "GenericFileTransfer": "1202", + "GenericAudio": "1203", + "GenericTelephony": "1204", + "UPNP_Service": "1205", + "UPNP_IP_Service": "1206", + "ESDP_UPNP_IP_PAN": "1300", + "ESDP_UPNP_IP_LAP": "1301", + "ESDP_UPNP_L2CAP": "1302", + "VideoSource": "1303", + "VideoSink": "1304", + "VideoDistribution": "1305", + "HDP": "1400" +} + +### End Service Discovery UUIDS ### -- cgit v1.2.3 From 3e8c8d1d7fca64caf589190c90b9c4802ffe17f9 Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Tue, 26 Jun 2018 11:31:31 -0700 Subject: Add sig_uuid_constants to bt_test_utils Bug: 78879049 Test: Manual Change-Id: Iad5f9696f09cb06da77fe8e1b22778db2c5ee442 (cherry picked from commit 7b0baa5b591ee23326a708cc3dee232a3f7d8351) --- acts/framework/acts/test_utils/bt/bt_test_utils.py | 1 + 1 file changed, 1 insertion(+) diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py index 9a930ab898..d417fed328 100644 --- a/acts/framework/acts/test_utils/bt/bt_test_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py @@ -65,6 +65,7 @@ from acts.test_utils.bt.bt_constants import pan_connect_timeout from acts.test_utils.bt.bt_constants import small_timeout from acts.test_utils.bt.bt_constants import scan_result from acts.test_utils.bt.bt_constants import scan_failed +from acts.test_utils.bt.bt_constants import sig_uuid_constants from acts.test_utils.bt.bt_constants import hid_id_keyboard from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb -- cgit v1.2.3 From 4158463531ccae59673f9d344a1815f747dbfae3 Mon Sep 17 00:00:00 2001 From: tturney Date: Mon, 14 May 2018 07:17:21 -0700 Subject: Remove l2cap uuid from RfcommTest l2cap testing does not belong in an rfcomm test even though its listed in "valid" uuids in the spec. Bug: 69915194 Test: n/a Change-Id: Icb91906bc94065d747e19dc070cc16617123473e (cherry picked from commit 9466e8ce050d7ec4a07ffcb1b52ed860927b4797) --- acts/tests/google/bt/RfcommTest.py | 29 ++--------------------------- 1 file changed, 2 insertions(+), 27 deletions(-) diff --git a/acts/tests/google/bt/RfcommTest.py b/acts/tests/google/bt/RfcommTest.py index cdd53ee710..0a9ab5c8ab 100644 --- a/acts/tests/google/bt/RfcommTest.py +++ b/acts/tests/google/bt/RfcommTest.py @@ -846,30 +846,5 @@ class RfcommTest(BluetoothBaseTest): TAGS: Classic, RFCOMM Priority: 3 """ - return self._test_rfcomm_connection_with_uuid(bt_rfcomm_uuids[ - 'mcap_data_channel']) - - @BluetoothBaseTest.bt_test_wrap - @test_tracker_info(uuid='d6c7523d-9247-480e-8154-edd51ae1be50') - def test_rfcomm_connection_l2cap_uuid(self): - """Test Bluetooth RFCOMM connection using L2CAP uuid - - Test RFCOMM though establishing a basic connection. - - Steps: - 1. Get the mac address of the server device. - 2. Establish an RFCOMM connection from the client to the server AD. - 3. Verify that the RFCOMM connection is active from both the client and - server. - - Expected Result: - RFCOMM connection is established then disconnected succcessfully. - - Returns: - Pass if True - Fail if False - - TAGS: Classic, RFCOMM - Priority: 3 - """ - return self._test_rfcomm_connection_with_uuid(bt_rfcomm_uuids['l2cap']) + return self._test_rfcomm_connection_with_uuid( + bt_rfcomm_uuids['mcap_data_channel']) -- cgit v1.2.3 From c5f0632a5d9417e2c87c09c3f21427b1d4a7fb1b Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Mon, 2 Jul 2018 14:02:30 -0700 Subject: Fix logic for ConcurrentBleScanningTest Was incorrectly returning a failure for a pass result. Bug: 19919762 Test: manual execution Change-Id: I300222df86bd6c63887076bb32fac9722a5d050e (cherry picked from commit 7080d75d1d5453daffc58f9c4cd043c76db5a8ee) --- acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py index 1972806c80..83b5ddc60e 100644 --- a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py +++ b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py @@ -264,13 +264,13 @@ class ConcurrentBleScanningTest(BluetoothBaseTest): try: self.scn_ad.ed.pop_event( scan_failed.format(scan_callback), self.default_timeout) - self.log.info( - "Found scan event successfully. Iteration {} successful." + self.log.error( + "Unexpected scan event found. Iteration {} successful." .format(i)) + test_result = False except Exception: - self.log.info("Failed to find a onScanFailed event for callback {}" + self.log.info("No onScanFailed event for callback {} as expected." .format(scan_callback)) - test_result = False for callback in scan_callback_list: self.scn_ad.droid.bleStopBleScan(callback) return test_result -- cgit v1.2.3 From d08f0ae1821f9216864c9b17884d43e8fd56f7d0 Mon Sep 17 00:00:00 2001 From: Tom Turney Date: Tue, 3 Jul 2018 16:18:44 -0700 Subject: Add filtering for tests where location is disabled Bug: 109827064 Bug: 109823020 Test: Manual run Change-Id: I58374f355df4df3d60f108e80fb8a72c83ed5fbb (cherry picked from commit e10fe1693a3a08cfa85e2ea361782233f39af5d5) --- .../google/ble/filtering/UniqueFilteringTest.py | 91 ++++++++++++++-------- 1 file changed, 57 insertions(+), 34 deletions(-) diff --git a/acts/tests/google/ble/filtering/UniqueFilteringTest.py b/acts/tests/google/ble/filtering/UniqueFilteringTest.py index bb7b00f516..e4ad858197 100644 --- a/acts/tests/google/ble/filtering/UniqueFilteringTest.py +++ b/acts/tests/google/ble/filtering/UniqueFilteringTest.py @@ -58,8 +58,8 @@ class UniqueFilteringTest(BluetoothBaseTest): callbacktype = event['data']['CallbackType'] if callbacktype != expected_callbacktype: self.log.debug( - "Expected callback type: {}, Found callback type: {}" - .format(expected_callbacktype, callbacktype)) + "Expected callback type: {}, Found callback type: {}".format( + expected_callbacktype, callbacktype)) test_result = False return test_result @@ -109,6 +109,7 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ test_result = True + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects( @@ -118,6 +119,10 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) + self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) self.scn_ad.droid.bleFlushPendingScanResults(scan_callback) @@ -128,10 +133,10 @@ class UniqueFilteringTest(BluetoothBaseTest): self.log.debug(worker.result(self.default_timeout)) except Empty as error: test_result = False - self.log.debug("Test failed with Empty error: {}".format(error)) + self.log.error("Test failed with Empty error: {}".format(error)) except concurrent.futures._base.TimeoutError as error: test_result = False - self.log.debug("Test failed with TimeoutError: {}".format(error)) + self.log.error("Test failed with TimeoutError: {}".format(error)) self.scn_ad.droid.bleStopBleScan(scan_callback) self.adv_ad.droid.bleStopBleAdvertising(advertise_callback) return test_result @@ -173,6 +178,9 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) system_time_nanos = self.scn_ad.droid.getSystemElapsedRealtimeNanos() @@ -220,10 +228,14 @@ class UniqueFilteringTest(BluetoothBaseTest): filter_list, scan_settings, scan_callback = generate_ble_scan_objects( self.scn_ad.droid) expected_event_name = batch_scan_result.format(scan_callback) + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) advertise_callback, advertise_data, advertise_settings = ( generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) worker = self.scn_ad.ed.handle_event( @@ -232,8 +244,8 @@ class UniqueFilteringTest(BluetoothBaseTest): self.scn_ad.droid.bleFlushPendingScanResults(scan_callback) try: event_info = self.scn_ad.ed.pop_event(expected_event_name, 10) - self.log.debug("Unexpectedly found an advertiser: {}".format( - event_info)) + self.log.debug( + "Unexpectedly found an advertiser: {}".format(event_info)) test_result = False except Empty: self.log.debug("No {} events were found as expected.".format( @@ -285,8 +297,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event_info = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - self.log.error("Unexpectedly found an advertiser: {}".format( - event_info)) + self.log.error( + "Unexpectedly found an advertiser: {}".format(event_info)) test_result = False except Empty: self.log.debug("No events were found as expected.") @@ -337,6 +349,9 @@ class UniqueFilteringTest(BluetoothBaseTest): generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) worker = self.scn_ad.ed.handle_event( @@ -345,8 +360,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event_info = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - self.log.error("Unexpectedly found an advertiser:".format( - event_info)) + self.log.error( + "Unexpectedly found an advertiser:".format(event_info)) test_result = False except Empty as error: self.log.debug("No events were found as expected.") @@ -399,12 +414,12 @@ class UniqueFilteringTest(BluetoothBaseTest): advertise_callback1, advertise_data1, advertise_settings1) filter_list = self.scn_ad.droid.bleGenFilterList() - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_callback = self.scn_ad.droid.bleGenScanCallback() - self.scn_ad.droid.bleSetScanFilterManufacturerData(117, [1, 2, 3], - [127, 127, 127]) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + 117, [1, 2, 3], [127, 127, 127]) self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) @@ -439,27 +454,35 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ test_result = True - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) filter_list, scan_settings, scan_callback = generate_ble_scan_objects( self.scn_ad.droid) expected_event_name = scan_result.format(scan_callback) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) + self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True) advertise_callback, advertise_data, advertise_settings = ( generate_ble_advertise_objects(self.adv_ad.droid)) self.adv_ad.droid.bleStartBleAdvertising( advertise_callback, advertise_data, advertise_settings) + self.scn_ad.droid.bleSetScanFilterDeviceName( + self.adv_ad.droid.bluetoothGetLocalName()) + self.scn_ad.droid.bleBuildScanFilter(filter_list) self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings, scan_callback) - event_info = self.scn_ad.ed.pop_event(expected_event_name, - self.default_timeout) + try: + event_info = self.scn_ad.ed.pop_event(expected_event_name, + self.default_timeout) + except Empty as error: + self.log.error("Could not find initial advertisement.") + return False mac_address = event_info['data']['Result']['deviceInfo']['address'] - self.log.info("Filter advertisement with address {}".format( - mac_address)) + self.log.info( + "Filter advertisement with address {}".format(mac_address)) self.scn_ad.droid.bleStopBleScan(scan_callback) - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) self.scn_ad.droid.bleSetScanFilterDeviceAddress(mac_address) filter_list2, scan_settings2, scan_callback2 = ( generate_ble_scan_objects(self.scn_ad.droid)) @@ -506,8 +529,8 @@ class UniqueFilteringTest(BluetoothBaseTest): Priority: 1 """ manufacturer_id = 0x4c - self.adv_ad.droid.bleAddAdvertiseDataManufacturerId(manufacturer_id, - [0x01]) + self.adv_ad.droid.bleAddAdvertiseDataManufacturerId( + manufacturer_id, [0x01]) self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode( ble_advertise_settings_modes['low_latency']) advertise_callback, advertise_data, advertise_settings = ( @@ -521,10 +544,10 @@ class UniqueFilteringTest(BluetoothBaseTest): self.log.info("Failed to start advertisement.") return False - self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[ - 'low_latency']) - self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id, - [0x01]) + self.scn_ad.droid.bleSetScanSettingsScanMode( + ble_scan_settings_modes['low_latency']) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + manufacturer_id, [0x01]) filter_list = self.scn_ad.droid.bleGenFilterList() scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list) @@ -540,8 +563,8 @@ class UniqueFilteringTest(BluetoothBaseTest): try: event = self.scn_ad.ed.pop_event(expected_event_name, self.default_timeout) - found_manufacturer_id = json.loads(event['data']['Result'][ - 'manufacturerIdList']) + found_manufacturer_id = json.loads( + event['data']['Result']['manufacturerIdList']) if found_manufacturer_id[0] != manufacturer_id: self.log.error( "Manufacturer id mismatch. Found {}, Expected {}". @@ -601,8 +624,8 @@ class UniqueFilteringTest(BluetoothBaseTest): self.scn_ad.droid.bleSetScanSettingsScanMode( ble_scan_settings_modes['low_latency']) - self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id, - [0x01]) + self.scn_ad.droid.bleSetScanFilterManufacturerData( + manufacturer_id, [0x01]) filter_list = self.scn_ad.droid.bleGenFilterList() scan_settings = self.scn_ad.droid.bleBuildScanSetting() scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list) @@ -617,8 +640,8 @@ class UniqueFilteringTest(BluetoothBaseTest): except Empty: self.log.error("Unable to find beacon advertisement.") return False - found_manufacturer_id = json.loads(event['data']['Result'][ - 'manufacturerIdList']) + found_manufacturer_id = json.loads( + event['data']['Result']['manufacturerIdList']) if found_manufacturer_id[0] != manufacturer_id: self.log.error( "Manufacturer id mismatch. Found {}, Expected {}".format( -- cgit v1.2.3 From 196b72fea0345b0a77da3a996b85e9150b881791 Mon Sep 17 00:00:00 2001 From: Jakub Pawlowski Date: Thu, 26 Apr 2018 10:38:08 +0200 Subject: Modify BleOpportunisticScanTest:test_discover_opportunistic_scan_result_off_secondary_scan_filter This test case had a bad assumption. Starting opportunistic scan and non-opportunistic filtered scan should result with only filtered scan results returned from controller. Having opportunistic scan running should not impact filter. Bug: 78533856 Test: sl4a BleOpportunisticScanTest Change-Id: Ie7aa5d3257d947da07aa4644b21103a9de470c31 (cherry picked from commit 5793209d66364e0b16d81a49889d31b7b546309d) --- acts/tests/google/ble/scan/BleOpportunisticScanTest.py | 13 +++++-------- 1 file changed, 5 insertions(+), 8 deletions(-) diff --git a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py index e907cbb342..9803648000 100644 --- a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py +++ b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py @@ -524,9 +524,9 @@ class BleOpportunisticScanTest(BluetoothBaseTest): self): """Test opportunistic scan result from secondary scan filter. - Tests opportunistic scan where the secondary scan instance does not find - an advertisement but the scan instance with scan mode set to - opportunistic scan will find an advertisement. + Tests opportunistic scan where the filtered scan instance does not find + an advertisement and the scan instance with scan mode set to + opportunistic scan will also not find an advertisement. Steps: 1. Initialize advertiser and start advertisement on dut1 (make sure the @@ -539,6 +539,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest): 6. Pop onScanResults from the second scanner 7. Expect no events 8. Pop onScanResults from the first scanner + 9. Expect no events Expected Result: Opportunistic scan instance finds an advertisement. @@ -573,11 +574,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest): if not self._verify_no_events_found( scan_result.format(scan_callback2)): return False - try: - self.scn_ad.ed.pop_event( - scan_result.format(scan_callback), self.default_timeout) - except Empty: - self.log.error("Opportunistic scan found no scan results.") + if not self._verify_no_events_found(scan_result.format(scan_callback)): return False return True -- cgit v1.2.3 From 66e3970448acfdb24a92289e0298607bb3a8ba36 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Wed, 11 Jul 2018 13:21:18 -0700 Subject: [DhcpServerTest] Add testtracker UUIDs to DhcpServerTest CHERRY PICKED MULTIPLE CLs from AOSP Bug: 109670497 Test: Verified the changes Change-Id: I0eaa9f8d9fc83b3a26b59834cf2fb168029fae6c Merged-In: I2b7d99e99f4e2adab26373951a5d3ebc2d83883a --- acts/tests/google/net/DhcpServerTest.py | 457 ++++++++++++++++++++++++++++++++ 1 file changed, 457 insertions(+) create mode 100644 acts/tests/google/net/DhcpServerTest.py diff --git a/acts/tests/google/net/DhcpServerTest.py b/acts/tests/google/net/DhcpServerTest.py new file mode 100644 index 0000000000..4cc2b91da3 --- /dev/null +++ b/acts/tests/google/net/DhcpServerTest.py @@ -0,0 +1,457 @@ +from acts import asserts +from acts import base_test +from acts.controllers import android_device +from acts.test_decorators import test_tracker_info + +from scapy.all import * +from threading import Event +from threading import Thread +import time +import warnings + + +SERVER_PORT = 67 +BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' +NETADDR_PREFIX = '192.168.42.' +OTHER_NETADDR_PREFIX = '192.168.43.' +NETADDR_BROADCAST = '255.255.255.255' +SUBNET_BROADCAST = NETADDR_PREFIX + '255' + + +OFFER = 2 +REQUEST = 3 +ACK = 5 +NAK = 6 + +pmc_base_cmd = ( + "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ") +start_pmc_cmd = ( + "am start -S -n com.android.pmc/com.android.pmc.PMCMainActivity") +pmc_start_usb_tethering_cmd = "%sStartUSBTethering" % pmc_base_cmd +pmc_stop_usb_tethering_cmd = "%sStopUSBTethering" % pmc_base_cmd + + +class DhcpServerTest(base_test.BaseTestClass): + def setup_class(self): + self.dut = self.android_devices[0] + self.USB_TETHERED = False + self.next_hwaddr_index = 0 + self.stop_arp = Event() + + conf.checkIPaddr = 0 + conf.checkIPsrc = 0 + # Allow using non-67 server ports as long as client uses 68 + bind_layers(UDP, BOOTP, dport=68) + + self.dut.adb.shell(start_pmc_cmd) + self.dut.adb.shell("setprop log.tag.PMC VERBOSE") + iflist_before = get_if_list() + self._start_usb_tethering(self.dut) + self.iface = self._wait_for_new_iface(iflist_before) + self.real_hwaddr = get_if_raw_hwaddr(self.iface) + + # Start a thread to answer to all ARP "who-has" + thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) + thread.start() + + # Discover server IP + hwaddr = self._next_hwaddr() + resp = self._get_response(make_discover(hwaddr)) + asserts.assert_false(None == resp, + "Device did not reply to first DHCP discover") + self.server_addr = getopt(resp, 'server_id') + asserts.assert_false(None == self.server_addr, + "DHCP server did not specify server identifier") + # Ensure that we don't depend on assigned route/gateway on the host + conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0") + + def setup_test(self): + # Some versions of scapy do not close the receive file properly + warnings.filterwarnings("ignore", category=ResourceWarning) + + bind_layers(UDP, BOOTP, dport=68) + self.hwaddr = self._next_hwaddr() + self.other_hwaddr = self._next_hwaddr() + self.cleanup_releases = [] + + def teardown_test(self): + for packet in self.cleanup_releases: + self._send(packet) + + def teardown_class(self): + self.stop_arp.set() + self._stop_usb_tethering(self.dut) + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + def _start_usb_tethering(self, dut): + """ Start USB tethering + + Args: + 1. dut - ad object + """ + self.log.info("Starting USB Tethering") + dut.stop_services() + dut.adb.shell(pmc_start_usb_tethering_cmd) + self.USB_TETHERED = True + + def _stop_usb_tethering(self, dut): + """ Stop USB tethering + + Args: + 1. dut - ad object + """ + self.log.info("Stopping USB Tethering") + dut.adb.shell(pmc_stop_usb_tethering_cmd) + self._wait_for_device(self.dut) + dut.start_services(skip_sl4a=getattr(dut, "skip_sl4a", False)) + self.USB_TETHERED = False + + def _wait_for_device(self, dut): + """ Wait for device to come back online + + Args: + 1. dut - ad object + """ + while dut.serial not in android_device.list_adb_devices(): + pass + dut.adb.wait_for_device() + + def _wait_for_new_iface(self, old_ifaces): + old_set = set(old_ifaces) + # Try 10 times to find a new interface with a 1s sleep every time + # (equivalent to a 9s timeout) + for i in range(0, 10): + new_ifaces = set(get_if_list()) - old_set + asserts.assert_true(len(new_ifaces) < 2, + "Too many new interfaces after turning on tethering") + if len(new_ifaces) == 1: + return new_ifaces.pop() + time.sleep(1) + asserts.fail("Timeout waiting for tethering interface on host") + + def _sniff_arp(self, stop_arp): + try: + sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0, + stop_filter=lambda p: stop_arp.is_set()) + except: + # sniff may raise when tethering is disconnected. Ignore + # exceptions if stop was requested. + if not stop_arp.is_set(): + raise + + def _handle_arp(self, packet): + # Reply to all arp "who-has": say we have everything + if packet[ARP].op == ARP.who_has: + reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst, + hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST) + sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply, + iface=self.iface, verbose=False) + + @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") + def test_config_assumptions(self): + resp = self._get_response(make_discover(self.hwaddr)) + asserts.assert_false(None == resp, "Device did not reply to discover") + asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), + "Server does not use expected prefix") + + @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") + def test_discover_assigned_ownaddress(self): + addr, siaddr, resp = self._request_address(self.hwaddr) + + lease_time = getopt(resp, 'lease_time') + server_id = getopt(resp, 'server_id') + asserts.assert_true(lease_time > 10, "Lease time is unreasonably short") + asserts.assert_false(addr == '0.0.0.0', "Assigned address is empty") + # Wait to test lease expiration time change + time.sleep(2) + + # New discover, same address + resp = self._get_response(make_discover(self.hwaddr)) + # Lease time renewed: exptime not decreased + asserts.assert_equal(lease_time, getopt(resp, 'lease_time')) + asserts.assert_equal(addr, get_yiaddr(resp)) + + @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") + def test_discover_assigned_otherhost(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same address, different client + resp = self._get_response(make_discover(self.other_hwaddr, + [('requested_addr', addr)])) + + self._assert_offer(resp) + asserts.assert_false(get_yiaddr(resp) == addr, + "Already assigned address offered") + + @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") + def test_discover_requestaddress(self): + addr = NETADDR_PREFIX + '200' + resp = self._get_response(make_discover(self.hwaddr, + [('requested_addr', addr)])) + self._assert_offer(resp) + asserts.assert_equal(get_yiaddr(resp), addr) + + # Lease not committed: can request again + resp = self._get_response(make_discover(self.other_hwaddr, + [('requested_addr', addr)])) + self._assert_offer(resp) + asserts.assert_equal(get_yiaddr(resp), addr) + + def _assert_renews(self, request, addr, expTime): + time.sleep(2) + resp = self._get_response(request) + self._assert_ack(resp) + asserts.assert_equal(addr, get_yiaddr(resp)) + # Lease time renewed + asserts.assert_equal(expTime, getopt(resp, 'lease_time')) + + @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") + def test_request_wrongnet(self): + resp = self._get_response(make_request(self.hwaddr, + OTHER_NETADDR_PREFIX + '1', None)) + self._assert_nak(resp) + + @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") + def test_request_inuse(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + res = self._get_response(make_request(self.other_hwaddr, addr, None)) + self._assert_nak(res) + + @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") + def test_request_initreboot(self): + addr, siaddr, resp = self._request_address(self.hwaddr) + exp = getopt(resp, 'lease_time') + + # siaddr NONE: init-reboot client state + self._assert_renews(make_request(self.hwaddr, addr, None), addr, exp) + + @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") + def test_request_initreboot_nolease(self): + # RFC2131 #4.3.2 + asserts.skip("dnsmasq not compliant if --dhcp-authoritative set.") + addr = NETADDR_PREFIX + '123' + resp = self._get_response(make_request(self.hwaddr, addr, None)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") + def test_request_initreboot_incorrectlease(self): + otheraddr = NETADDR_PREFIX + '123' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + + resp = self._get_response(make_request(self.hwaddr, otheraddr, None)) + self._assert_nak(resp) + + @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") + def test_request_rebinding(self): + addr, siaddr, resp = self._request_address(self.hwaddr) + exp = getopt(resp, 'lease_time') + + self._assert_renews(make_request(self.hwaddr, None, None, ciaddr=addr), + addr, exp) + + @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") + def test_request_rebinding_inuse(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + resp = self._get_response(make_request(self.other_hwaddr, None, None, + ciaddr=addr)) + self._assert_nak(resp) + + @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") + def test_request_rebinding_wrongaddr(self): + otheraddr = NETADDR_PREFIX + '123' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + + resp = self._get_response(make_request(self.hwaddr, None, None, + ciaddr=otheraddr)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb") + def test_request_rebinding_wrongaddr_relayed(self): + otheraddr = NETADDR_PREFIX + '123' + relayaddr = NETADDR_PREFIX + '124' + addr, siaddr, _ = self._request_address(self.hwaddr) + asserts.assert_false(addr == otheraddr, + "Test assumption not met: server assigned " + otheraddr) + asserts.assert_false(addr == relayaddr, + "Test assumption not met: server assigned " + relayaddr) + + req = make_request(self.hwaddr, None, None, ciaddr=otheraddr) + req.getlayer(BOOTP).giaddr = relayaddr + + resp = self._get_response(req) + self._assert_nak(resp) + self._assert_unicast(resp, relayaddr) + + @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") + def test_release(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + # Re-requesting fails + resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + + # Succeeds after release + self._send(make_release(self.hwaddr, addr, siaddr)) + time.sleep(1) + resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._assert_ack(resp) + + @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") + def test_release_noserverid(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # Release without server_id opt ignored + release = make_release(self.hwaddr, addr, siaddr) + removeopt(release, 'server_id') + self._send(release) + + # Not released: request fails + resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + + @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") + def test_release_wrongserverid(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # Release with wrong server id + release = make_release(self.hwaddr, addr, siaddr) + setopt(release, 'server_id', addr) + self._send(release) + + # Not released: request fails + resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._assert_nak(resp) + + def _request_address(self, hwaddr): + resp = self._get_response(make_discover(self.hwaddr)) + self._assert_offer(resp) + addr = get_yiaddr(resp) + siaddr = getopt(resp, 'server_id') + resp = self._get_response(make_request(self.hwaddr, addr, siaddr)) + self._assert_ack(resp) + return addr, siaddr, resp + + def _get_response(self, packet): + resp = srp1(packet, iface=self.iface, timeout=10, verbose=False) + bootp_resp = (resp or None) and resp.getlayer(BOOTP) + if bootp_resp != None and get_mess_type(bootp_resp) == ACK: + # Note down corresponding release for this request + release = make_release(bootp_resp.chaddr, bootp_resp.yiaddr, + getopt(bootp_resp, 'server_id')) + self.cleanup_releases.append(release) + return resp + + def _send(self, packet): + sendp(packet, iface=self.iface, verbose=False) + + def _assert_type(self, packet, tp): + asserts.assert_false(None == packet, "No packet") + asserts.assert_equal(tp, get_mess_type(packet)) + + def _assert_ack(self, packet): + self._assert_type(packet, ACK) + + def _assert_nak(self, packet): + self._assert_type(packet, NAK) + + def _assert_offer(self, packet): + self._assert_type(packet, OFFER) + + def _assert_broadcast(self, packet): + asserts.assert_false(None == packet, "No packet") + asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) + asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) + asserts.assert_equal(packet.getlayer(BOOTP).flags, 0x8000) + + def _assert_unicast(self, packet, ipAddr=None): + asserts.assert_false(None == packet, "No packet") + asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC, + "Layer 2 packet destination address was broadcast") + if ipAddr: + asserts.assert_equal(packet.getlayer(IP).dst, ipAddr) + + def _next_hwaddr(self): + addr = make_hwaddr(self.next_hwaddr_index) + self.next_hwaddr_index = self.next_hwaddr_index + 1 + return addr + + +def setopt(packet, optname, val): + dhcp = packet.getlayer(DHCP) + if optname in [opt[0] for opt in dhcp.options]: + dhcp.options = [(optname, val) if opt[0] == optname else opt + for opt in dhcp.options] + else: + # Add before the last option (last option is "end") + dhcp.options.insert(len(dhcp.options) - 1, (optname, val)) + + +def getopt(packet, key): + opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key] + return opts[0] if opts else None + + +def removeopt(packet, key): + dhcp = packet.getlayer(DHCP) + dhcp.options = [opt for opt in dhcp.options if opt[0] != key] + + +def get_yiaddr(packet): + return packet.getlayer(BOOTP).yiaddr + + +def get_mess_type(packet): + return getopt(packet, 'message-type') + + +def make_dhcp(src_hwaddr, options, ciaddr='0.0.0.0', ipSrc='0.0.0.0', + ipDst=NETADDR_BROADCAST): + broadcast = (ipDst == NETADDR_BROADCAST) + ethernet = Ether(dst=BROADCAST_MAC) if broadcast else Ether() + ip = IP(src=ipSrc, dst=ipDst) + udp = UDP(sport=68, dport=SERVER_PORT) + bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, + flags=(0x8000 if broadcast else 0), xid=RandInt()) + dhcp = DHCP(options=options) + return ethernet / ip / udp / bootp / dhcp + + +def make_discover(src_hwaddr, options = []): + opts = [('message-type','discover')] + opts.extend(options) + opts.append('end') + return make_dhcp(src_hwaddr, options=opts) + + +def make_request(src_hwaddr, reqaddr, siaddr, ciaddr='0.0.0.0', ipSrc=None): + if ipSrc == None: + ipSrc = ciaddr + opts = [('message-type', 'request')] + if siaddr: + opts.append(('server_id', siaddr)) + if reqaddr: + opts.append(('requested_addr', reqaddr)) + opts.append('end') + return make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, ipSrc=ciaddr) + + +def make_release(src_hwaddr, addr, server_id): + opts = [('message-type', 'release'), ('server_id', server_id), 'end'] + return make_dhcp(src_hwaddr, opts, ciaddr=addr, ipSrc=addr, ipDst=server_id) + + +def make_hwaddr(index): + if index > 0xffff: + raise ValueError("Address index out of range") + return '\x44\x85\x00\x00{}{}'.format(chr(index >> 8), chr(index & 0xff)) + + +def format_hwaddr(addr): + return ':'.join(['%02x' % ord(c) for c in addr]) -- cgit v1.2.3 From e1ed53755653ff5a3ec046cf444d42fee66a7f26 Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Wed, 11 Jul 2018 22:23:00 -0700 Subject: Merge "[WifiBaseTest]Configure same ssid on both bands and Un-mirror APs." am: 6ddbae823c am: ecbdc8bb80 am: 5abd425c22 Bug: 111366584 Bug: 72111271 Change-Id: I48b722a37c10a590e84e35d44b0b6b97a444bfdf (cherry picked from commit 3b34d0e6517681d734101ab7460584f39a1aa1ea) --- .../framework/acts/test_utils/wifi/WifiBaseTest.py | 216 ++++++++++++++------- 1 file changed, 148 insertions(+), 68 deletions(-) diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py index 3c5c92c3cc..fe72deddb7 100755 --- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py +++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py @@ -33,6 +33,9 @@ from acts.controllers.ap_lib import hostapd_bss_settings from acts.controllers.ap_lib import hostapd_constants from acts.controllers.ap_lib import hostapd_security +AP_1 = 0 +AP_2 = 1 +MAX_AP_COUNT = 2 class WifiBaseTest(BaseTestClass): def __init__(self, controllers): @@ -43,8 +46,10 @@ class WifiBaseTest(BaseTestClass): def get_wpa2_network( self, + mirror_ap, + reference_networks, hidden=False, - ap_count=1, + same_ssid=False, ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G, @@ -53,13 +58,16 @@ class WifiBaseTest(BaseTestClass): generator. Args: - ap_count: Determines if we want to use one or both the APs - for configuration. If set to '2', then both APs will - be configured with the same configuration. - ssid_length_2g: Int, number of characters to use for 2G SSID. + mirror_ap: Boolean, determines if both APs use the same hostapd + config or different configs. + reference_networks: List of PSK networks. + same_ssid: Boolean, determines if both bands on AP use the same + SSID. + ssid_length_2gecond AP Int, number of characters to use for 2G SSID. ssid_length_5g: Int, number of characters to use for 5G SSID. passphrase_length_2g: Int, length of password for 2G network. passphrase_length_5g: Int, length of password for 5G network. + Returns: A dict of 2G and 5G network lists for hostapd configuration. """ @@ -67,13 +75,20 @@ class WifiBaseTest(BaseTestClass): network_dict_5g = {} ref_5g_security = hostapd_constants.WPA2_STRING ref_2g_security = hostapd_constants.WPA2_STRING - self.user_params["reference_networks"] = [] - ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) - ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + if same_ssid: + ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_5g_ssid = ref_2g_ssid + + ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + ref_5g_passphrase = ref_2g_passphrase - ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) - ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) + else: + ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) + ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g) + + ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g) if hidden: network_dict_2g = { @@ -103,37 +118,47 @@ class WifiBaseTest(BaseTestClass): } ap = 0 - for ap in range(ap_count): - self.user_params["reference_networks"].append({ - "2g": - copy.copy(network_dict_2g), - "5g": - copy.copy(network_dict_5g) + for ap in range(MAX_AP_COUNT): + reference_networks.append({ + "2g": copy.copy(network_dict_2g), + "5g": copy.copy(network_dict_5g) }) - self.reference_networks = self.user_params["reference_networks"] + if not mirror_ap: + break return {"2g": network_dict_2g, "5g": network_dict_5g} def get_open_network(self, + mirror_ap, + open_network, hidden=False, - ap_count=1, + same_ssid=False, ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G, ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G): """Generates SSIDs for a open network using a random generator. Args: - ap_count: Determines if we want to use one or both the APs for - for configuration. If set to '2', then both APs will - be configured with the same configuration. + mirror_ap: Boolean, determines if both APs use the same hostapd + config or different configs. + open_network: List of open networks. + same_ssid: Boolean, determines if both bands on AP use the same + SSID. ssid_length_2g: Int, number of characters to use for 2G SSID. ssid_length_5g: Int, number of characters to use for 5G SSID. + Returns: A dict of 2G and 5G network lists for hostapd configuration. """ network_dict_2g = {} network_dict_5g = {} - self.user_params["open_network"] = [] - open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) - open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + + if same_ssid: + open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g) + open_5g_ssid = open_2g_ssid + + else: + open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g) + open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g) + if hidden: network_dict_2g = { "SSID": open_2g_ssid, @@ -158,14 +183,35 @@ class WifiBaseTest(BaseTestClass): } ap = 0 - for ap in range(ap_count): - self.user_params["open_network"].append({ - "2g": network_dict_2g, - "5g": network_dict_5g + for ap in range(MAX_AP_COUNT): + open_network.append({ + "2g": copy.copy(network_dict_2g), + "5g": copy.copy(network_dict_5g) }) - self.open_network = self.user_params["open_network"] + if not mirror_ap: + break return {"2g": network_dict_2g, "5g": network_dict_5g} + def update_bssid(self, ap_instance, ap, network, band): + """Get bssid and update network dictionary. + + Args: + ap_instance: Accesspoint index that was configured. + ap: Accesspoint object corresponding to ap_instance. + network: Network dictionary. + band: Wifi networks' band. + + """ + bssid = ap.get_bssid_from_ssid(network["SSID"], band) + + if network["security"] == hostapd_constants.WPA2_STRING: + # TODO:(bamahadev) Change all occurances of reference_networks + # to wpa_networks. + self.reference_networks[ap_instance][band]["bssid"] = bssid + + if network["security"] == 'none': + self.open_network[ap_instance][band]["bssid"] = bssid + def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g): """Get bssid for a given SSID and add it to the network dictionary. @@ -180,20 +226,17 @@ class WifiBaseTest(BaseTestClass): if not (networks_5g or networks_2g): return - for network in itertools.chain(networks_5g, networks_2g): + for network in networks_5g: + if 'channel' in network: + continue + self.update_bssid(ap_instance, ap, network, + hostapd_constants.BAND_5G) + + for network in networks_2g: if 'channel' in network: continue - bssid = ap.get_bssid_from_ssid(network["SSID"]) - if '2g' in network["SSID"]: - band = hostapd_constants.BAND_2G - else: - band = hostapd_constants.BAND_5G - if network["security"] == hostapd_constants.WPA2_STRING: - # TODO:(bamahadev) Change all occurances of reference_networks - # to wpa_networks. - self.reference_networks[ap_instance][band]["bssid"] = bssid - else: - self.open_network[ap_instance][band]["bssid"] = bssid + self.update_bssid(ap_instance, ap, network, + hostapd_constants.BAND_2G) def legacy_configure_ap_and_start( self, @@ -206,47 +249,84 @@ class WifiBaseTest(BaseTestClass): ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G, ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G, hidden=False, + same_ssid=False, + mirror_ap=True, ap_count=1): asserts.assert_true( len(self.user_params["AccessPoint"]) == 2, "Exactly two access points must be specified. \ Each access point has 2 radios, one each for 2.4GHZ \ and 5GHz. A test can choose to use one or both APs.") - network_list_2g = [] - network_list_5g = [] - network_list_2g.append({"channel": channel_2g}) - network_list_5g.append({"channel": channel_5g}) - if "reference_networks" in self.user_params: - pass - else: - networks_dict = self.get_wpa2_network(hidden=hidden, - ap_count=ap_count) - network_list_2g.append(networks_dict["2g"]) - network_list_5g.append(networks_dict["5g"]) + config_count = 1 + count = 0 + + if mirror_ap and ap_count == 1: + raise ValueError("ap_count cannot be 1 if mirror_ap is True.") + + if not mirror_ap: + config_count = ap_count + + self.user_params["reference_networks"] = [] + self.user_params["open_network"] = [] + + for count in range(config_count): + + network_list_2g = [] + network_list_5g = [] + + orig_network_list_2g = [] + orig_network_list_5g = [] + + network_list_2g.append({"channel": channel_2g}) + network_list_5g.append({"channel": channel_5g}) + + networks_dict = self.get_wpa2_network( + mirror_ap, + self.user_params["reference_networks"], + hidden=hidden, + same_ssid=same_ssid) + self.reference_networks = self.user_params["reference_networks"] - if "open_network" in self.user_params: - pass - else: - networks_dict = self.get_open_network(hidden=hidden, - ap_count=ap_count) network_list_2g.append(networks_dict["2g"]) network_list_5g.append(networks_dict["5g"]) - orig_network_list_5g = copy.copy(network_list_5g) - orig_network_list_2g = copy.copy(network_list_2g) + # When same_ssid is set, only configure one set of WPA networks. + # We cannot have more than one set because duplicate interface names + # are not allowed. + # TODO(bmahadev): Provide option to select the type of network, + # instead of defaulting to WPA. + if not same_ssid: + networks_dict = self.get_open_network( + mirror_ap, + self.user_params["open_network"], + hidden=hidden, + same_ssid=same_ssid) + self.open_network = self.user_params["open_network"] - if len(network_list_5g) > 1: - self.config_5g = self._generate_legacy_ap_config(network_list_5g) - if len(network_list_2g) > 1: - self.config_2g = self._generate_legacy_ap_config(network_list_2g) - ap = 0 - for ap in range(ap_count): - self.access_points[ap].start_ap(self.config_2g) - self.access_points[ap].start_ap(self.config_5g) - self.populate_bssid(ap, self.access_points[ap], orig_network_list_5g, + network_list_2g.append(networks_dict["2g"]) + network_list_5g.append(networks_dict["5g"]) + + orig_network_list_5g = copy.copy(network_list_5g) + orig_network_list_2g = copy.copy(network_list_2g) + + if len(network_list_5g) > 1: + self.config_5g = self._generate_legacy_ap_config(network_list_5g) + if len(network_list_2g) > 1: + self.config_2g = self._generate_legacy_ap_config(network_list_2g) + + self.access_points[count].start_ap(self.config_2g) + self.access_points[count].start_ap(self.config_5g) + self.populate_bssid(count, self.access_points[count], orig_network_list_5g, orig_network_list_2g) + # Repeat configuration on the second router. + if mirror_ap and ap_count == 2: + self.access_points[AP_2].start_ap(self.config_2g) + self.access_points[AP_2].start_ap(self.config_5g) + self.populate_bssid(AP_2, self.access_points[AP_2], + orig_network_list_5g, orig_network_list_2g) + def _generate_legacy_ap_config(self, network_list): bss_settings = [] ap_settings = network_list.pop(0) -- cgit v1.2.3 From 72b61d92c894681338f2e3d83289172761ea3c45 Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Fri, 13 Jul 2018 17:20:00 -0700 Subject: [Tests]Reset connection before each iteration. Bug: 80550894 Test: Local Change-Id: I7b218d70236d53e3c278d2bd0e6c80c8619a5419 --- acts/tests/google/wifi/WifiStressTest.py | 1 + 1 file changed, 1 insertion(+) diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py index bb3396b29b..3385d90759 100755 --- a/acts/tests/google/wifi/WifiStressTest.py +++ b/acts/tests/google/wifi/WifiStressTest.py @@ -232,6 +232,7 @@ class WifiStressTest(WifiBaseTest): """ for count in range(int(self.stress_count/4)): + wutils.reset_wifi(self.dut) ssids = list() for network in self.networks: ssids.append(network[WifiEnums.SSID_KEY]) -- cgit v1.2.3 From 3a14a288e9c40cddb5d8f3875e1798adb6adcf84 Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Fri, 13 Jul 2018 21:51:37 -0700 Subject: Merge "[AccessPoint]Search for bssid's based on band." am: 76d436090b am: 77810bb482 am: 46068bcb41 Bug: 111410389 Bug: 111366584 Test: Local Change-Id: Id7d22a5e14ea040c54dc02197885a150a68d6cae (cherry picked from commit 31fd567532b10c7b17084383bb2b2b9c38c5ff88) --- acts/framework/acts/controllers/access_point.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py index b861cdc865..70a81bb26e 100755 --- a/acts/framework/acts/controllers/access_point.py +++ b/acts/framework/acts/controllers/access_point.py @@ -308,15 +308,19 @@ class AccessPoint(object): return interface - def get_bssid_from_ssid(self, ssid): + def get_bssid_from_ssid(self, ssid, band): """Gets the BSSID from a provided SSID Args: - ssid: An SSID string + ssid: An SSID string. + band: 2G or 5G Wifi band. Returns: The BSSID if on the AP or None if SSID could not be found. """ + if band == hostapd_constants.BAND_2G: + interfaces = [self.wlan_2g, ssid] + else: + interfaces = [self.wlan_5g, ssid] - interfaces = [self.wlan_2g, self.wlan_5g, ssid] # Get the interface name associated with the given ssid. for interface in interfaces: cmd = "iw dev %s info|grep ssid|awk -F' ' '{print $2}'" % ( -- cgit v1.2.3 From d265b2eda336fa3e2162592033aac9f630b0f32a Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Mon, 16 Jul 2018 12:08:35 -0700 Subject: [WifiBaseTest]Correcting condition to raise AP value error. This change is not merged to pi-dev and as a result pi-dr1-release runs fail CHERRY PICKED FROM AOSP Bug: 111500771 Bug: 111502276 Test: Local Change-Id: I62c1363013829817b5c20c125f30362bf04a9ad9 Merged-In: I6af55fe0faffbfa7726d4439d32972ca8f92769d --- acts/framework/acts/controllers/access_point.py | 1 + acts/framework/acts/test_utils/wifi/WifiBaseTest.py | 6 ++++-- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py index 70a81bb26e..313fcf999d 100755 --- a/acts/framework/acts/controllers/access_point.py +++ b/acts/framework/acts/controllers/access_point.py @@ -26,6 +26,7 @@ from acts.controllers.ap_lib import dhcp_config from acts.controllers.ap_lib import dhcp_server from acts.controllers.ap_lib import hostapd from acts.controllers.ap_lib import hostapd_config +from acts.controllers.ap_lib import hostapd_constants from acts.controllers.utils_lib.commands import ip from acts.controllers.utils_lib.commands import route from acts.controllers.utils_lib.commands import shell diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py index fe72deddb7..4b6dc8c50e 100755 --- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py +++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py @@ -261,8 +261,10 @@ class WifiBaseTest(BaseTestClass): config_count = 1 count = 0 - if mirror_ap and ap_count == 1: - raise ValueError("ap_count cannot be 1 if mirror_ap is True.") + # For example, the NetworkSelector tests use 2 APs and require that + # both APs are not mirrored. + if not mirror_ap and ap_count == 1: + raise ValueError("ap_count cannot be 1 if mirror_ap is False.") if not mirror_ap: config_count = ap_count -- cgit v1.2.3 From 07e1cd21864d997a336304b5c478ce21b993ca90 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Mon, 23 Jul 2018 13:39:49 -0700 Subject: Fixes for core networking tests CHERRY PICKED MULTIPLE CHANGES FROM AOSP Bug: 109670497 Test: Verified the changes Change-Id: I44741796d581b4216ed7e1ce15d1fdc679389394 Merged-In: I650dcc8f94c385e9c19ada570c910e3d0ca60a02 --- .../acts/test_utils/net/arduino_test_utils.py | 2 +- .../acts/test_utils/net/connectivity_test_utils.py | 63 ++++++ .../acts/test_utils/net/net_test_utils.py | 65 ++++++ acts/tests/google/net/CoreNetworkingTest.py | 13 +- acts/tests/google/net/DataCostTest.py | 23 +- acts/tests/google/net/DataUsageTest.py | 37 ++-- acts/tests/google/net/DnsOverTlsTest.py | 41 ++-- acts/tests/google/net/IpSecTest.py | 26 +-- acts/tests/google/net/LegacyVpnTest.py | 241 +++++++-------------- acts/tests/google/net/NattKeepAliveTest.py | 151 +++++++++++++ acts/tests/google/wifi/WifiTetheringTest.py | 23 +- 11 files changed, 421 insertions(+), 264 deletions(-) create mode 100644 acts/framework/acts/test_utils/net/connectivity_test_utils.py create mode 100644 acts/tests/google/net/NattKeepAliveTest.py diff --git a/acts/framework/acts/test_utils/net/arduino_test_utils.py b/acts/framework/acts/test_utils/net/arduino_test_utils.py index af0b3da15f..058d1433a4 100644 --- a/acts/framework/acts/test_utils/net/arduino_test_utils.py +++ b/acts/framework/acts/test_utils/net/arduino_test_utils.py @@ -44,7 +44,7 @@ def connect_wifi(wd, network=None): """ wd.log.info("Flashing connect_wifi.ino onto dongle") cmd = "locate %s" % CONNECT_WIFI - file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").rstrip() + file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").split()[-1] write_status = wd.write(ARDUINO, file_path, network) asserts.assert_true(write_status, "Failed to flash connect wifi") wd.log.info("Flashing complete") diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py new file mode 100644 index 0000000000..02167c966c --- /dev/null +++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py @@ -0,0 +1,63 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts + +def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10): + """ Start NAT-T keep alive on dut """ + + ad.log.info("Starting NATT Keepalive") + status = None + + key = ad.droid.connectivityStartNattKeepalive( + interval, src_ip, src_port, dst_ip) + + ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Started") + try: + event = ad.ed.pop_event("PacketKeepaliveCallback") + status = event["data"]["packetKeepaliveEvent"] + except Empty: + msg = "Failed to receive confirmation of starting natt keepalive" + asserts.fail(msg) + finally: + ad.droid.connectivityNattKeepaliveStopListeningForEvent( + key, "Started") + + if status != "Started": + ad.log.error("Received keepalive status: %s" % status) + ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key) + return None + return key + +def stop_natt_keepalive(ad, key): + """ Stop NAT-T keep alive on dut """ + + ad.log.info("Stopping NATT keepalive") + status = False + ad.droid.connectivityStopNattKeepalive(key) + + ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Stopped") + try: + event = ad.ed.pop_event("PacketKeepaliveCallback") + status = event["data"]["packetKeepaliveEvent"] == "Stopped" + except Empty: + msg = "Failed to receive confirmation of stopping natt keepalive" + asserts.fail(msg) + finally: + ad.droid.connectivityNattKeepaliveStopListeningForEvent( + key, "Stopped") + + ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key) + return status diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py index c7e55d3e9a..79cc10094c 100644 --- a/acts/framework/acts/test_utils/net/net_test_utils.py +++ b/acts/framework/acts/test_utils/net/net_test_utils.py @@ -15,6 +15,12 @@ # limitations under the License. from acts import asserts +from acts import utils +from acts.logger import epoch_to_log_line_timestamp +from acts.utils import get_current_epoch_time +from acts.logger import normalize_log_line_timestamp +from acts.utils import start_standing_subprocess +from acts.utils import stop_standing_subprocess from acts.test_utils.net import connectivity_const as cconst from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection from acts.test_utils.tel.tel_test_utils import verify_http_connection @@ -29,6 +35,7 @@ VPN_CONST = cconst.VpnProfile VPN_TYPE = cconst.VpnProfileType VPN_PARAMS = cconst.VpnReqParams VPN_PING_ADDR = "10.10.10.1" +TCPDUMP_PATH = "/data/local/tmp/tcpdump" def verify_lte_data_and_tethering_supported(ad): """Verify if LTE data is enabled and tethering supported""" @@ -209,3 +216,61 @@ def generate_legacy_vpn_profile(ad, vpn_profile[VPN_CONST.MPPE] = "mppe" return vpn_profile + +def start_tcpdump(ad, test_name): + """Start tcpdump on all interfaces + + Args: + ad: android device object. + test_name: tcpdump file name will have this + """ + ad.log.info("Starting tcpdump on all interfaces") + try: + ad.adb.shell("killall -9 tcpdump") + except AdbError: + ad.log.warn("Killing existing tcpdump processes failed") + out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH) + if "No such file" in out or not out: + ad.adb.shell("mkdir %s" % TCPDUMP_PATH) + else: + ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) + + begin_time = epoch_to_log_line_timestamp(get_current_epoch_time()) + begin_time = normalize_log_line_timestamp(begin_time) + + file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name) + ad.log.info("tcpdump file is %s", file_name) + try: + cmd = "adb -s {} shell tcpdump -i any -s0 -w {}" . \ + format(ad.serial, file_name) + return start_standing_subprocess(cmd, 5) + except e: + ad.log.error(e) + + return None + +def stop_tcpdump(ad, proc, test_name): + """Stops tcpdump on any iface + Pulls the tcpdump file in the tcpdump dir + + Args: + ad: android device object. + proc: need to know which pid to stop + test_name: test name to save the tcpdump file + + Returns: + log_path of the tcpdump file + """ + ad.log.info("Stopping and pulling tcpdump if any") + if proc == None: + return None + try: + stop_standing_subprocess(proc) + except Exception as e: + ad.log.warning(e) + log_path = os.path.join(ad.log_path, test_name) + utils.create_dir(log_path) + ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path)) + ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True) + file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name) + return "%s/%s" % (log_path, file_name) diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py index f06be4a54f..308beda8e1 100644 --- a/acts/tests/google/net/CoreNetworkingTest.py +++ b/acts/tests/google/net/CoreNetworkingTest.py @@ -17,8 +17,7 @@ from acts import asserts from acts import base_test from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.wifi import wifi_test_utils as wutils dum_class = "com.android.tests.connectivity.uid.ConnectivityTestActivity" @@ -30,17 +29,15 @@ class CoreNetworkingTest(base_test.BaseTestClass): def setup_class(self): """ Setup devices for tests and unpack params """ self.dut = self.android_devices[0] - wutils.wifi_toggle_state(self.dut, False) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) def teardown_class(self): """ Reset devices """ wutils.wifi_toggle_state(self.dut, True) + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + """ Test Cases """ @test_tracker_info(uuid="0c89d632-aafe-4bbd-a812-7b0eca6aafc7") diff --git a/acts/tests/google/net/DataCostTest.py b/acts/tests/google/net/DataCostTest.py index 4a829489e6..fb63c7a36b 100644 --- a/acts/tests/google/net/DataCostTest.py +++ b/acts/tests/google/net/DataCostTest.py @@ -25,8 +25,7 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.tel.tel_test_utils import _check_file_existance from acts.test_utils.tel.tel_test_utils import _generate_file_directory_and_file_name from acts.test_utils.wifi import wifi_test_utils as wutils @@ -48,17 +47,19 @@ class DataCostTest(base_test.BaseTestClass): self.unpack_userparams(req_params) for ad in self.android_devices: - wutils.reset_wifi(ad) - ad.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, ad, True) - asserts.assert_true( - verify_http_connection(self.log, ad), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(ad) def teardown_class(self): + """ Reset settings to default """ for ad in self.android_devices: + sub_id = str(ad.droid.telephonyGetSubscriberId()) + ad.droid.connectivityFactoryResetNetworkPolicies(sub_id) + ad.droid.connectivitySetDataWarningLimit(sub_id, -1) wutils.reset_wifi(ad) - ad.droid.telephonyToggleDataConnection(True) + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -155,10 +156,6 @@ class DataCostTest(base_test.BaseTestClass): ad.droid.connectivitySetDataWarningLimit( sub_id, int((total_pre + 5) * 1000.0 * 1000.0)) - # reset data limit - ad.droid.connectivityFactoryResetNetworkPolicies(sub_id) - ad.droid.connectivitySetDataWarningLimit(sub_id, -1) - # verify multipath preference values self._verify_multipath_preferences( ad, RELIABLE, NONE, wifi_network, cell_network) diff --git a/acts/tests/google/net/DataUsageTest.py b/acts/tests/google/net/DataUsageTest.py index 0e636ac35b..32ebd7f632 100644 --- a/acts/tests/google/net/DataUsageTest.py +++ b/acts/tests/google/net/DataUsageTest.py @@ -23,6 +23,7 @@ from acts.test_utils.net import connectivity_const as cconst from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection from acts.test_utils.tel.tel_test_utils import http_file_download_by_chrome from acts.test_utils.tel.tel_test_utils import verify_http_connection +import acts.test_utils.net.net_test_utils as nutils from acts.test_utils.tel import tel_test_utils as ttutils from acts.test_utils.wifi import wifi_test_utils as wutils @@ -56,12 +57,7 @@ class DataUsageTest(base_test.BaseTestClass): """ Setup devices for tests and unpack params """ self.dut = self.android_devices[0] self.tethered_devices = self.android_devices[1:] - wutils.reset_wifi(self.dut) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) # unpack user params req_params = ("wifi_network", "download_file", "file_size", "network") @@ -79,24 +75,16 @@ class DataUsageTest(base_test.BaseTestClass): # Set chrome browser start with no-first-run verification # Give permission to read from and write to storage - commands = ["pm grant com.android.chrome " - "android.permission.READ_EXTERNAL_STORAGE", - "pm grant com.android.chrome " - "android.permission.WRITE_EXTERNAL_STORAGE", - "rm /data/local/chrome-command-line", - "am set-debug-app --persistent com.android.chrome", - 'echo "chrome --no-default-browser-check --no-first-run ' - '--disable-fre" > /data/local/tmp/chrome-command-line'] - for cmd in commands: - for dut in self.android_devices: - try: - dut.adb.shell(cmd) - except adb.AdbError: - self.log.warn("adb command %s failed on %s" % (cmd, dut.serial)) + nutils.set_chrome_browser_permissions(self.dut) def teardown_class(self): """ Reset devices """ - wutils.reset_wifi(self.dut) + for ad in self.android_devices: + wutils.reset_wifi(ad) + + def on_fail(self, test_name, begin_time): + for ad in self.android_devices: + ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -206,9 +194,6 @@ class DataUsageTest(base_test.BaseTestClass): 5. Verify that data usage of ConnUIDTest app increased by ~xMB 6. Verify that data usage of device also increased by ~xMB """ - # disable wifi - wutils.wifi_toggle_state(self.dut, False) - # get pre mobile data usage (aos_pre, app_pre, total_pre) = self._get_data_usage(self.dut, cconst.TYPE_MOBILE) @@ -264,6 +249,10 @@ class DataUsageTest(base_test.BaseTestClass): self.log.info("Data usage of Android os increased by %s" % aos_diff) self.log.info("Data usage of ConnUID app increased by %s" % app_diff) self.log.info("Data usage on the device increased by %s" % total_diff) + + # forget network + wutils.wifi_forget_network(self.dut, self.wifi_network['SSID']) + return (aos_diff < DATA_ERR) and \ (self.file_size < app_diff < self.file_size + DATA_USG_ERR) and \ (self.file_size < total_diff < self.file_size + DATA_USG_ERR) diff --git a/acts/tests/google/net/DnsOverTlsTest.py b/acts/tests/google/net/DnsOverTlsTest.py index 46a6c01c6f..8d9fbed7ce 100644 --- a/acts/tests/google/net/DnsOverTlsTest.py +++ b/acts/tests/google/net/DnsOverTlsTest.py @@ -25,21 +25,22 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection -from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import verify_http_connection +from acts.test_utils.net import net_test_utils as nutils +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils from scapy.all import TCP from scapy.all import UDP from scapy.all import rdpcap +from scapy.all import Scapy_Exception DNS_QUAD9 = "dns.quad9.net" PRIVATE_DNS_MODE_OFF = "off" PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic" PRIVATE_DNS_MODE_STRICT = "hostname" RST = 0x04 +WLAN = "wlan0" class DnsOverTlsTest(base_test.BaseTestClass): """ Tests for Wifi Tethering """ @@ -48,17 +49,14 @@ class DnsOverTlsTest(base_test.BaseTestClass): """ Setup devices for tethering and unpack params """ self.dut = self.android_devices[0] - wutils.reset_wifi(self.dut) - self.dut.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.dut, True) - asserts.assert_true( - verify_http_connection(self.log, self.dut), - "HTTP verification failed on cell data connection") + nutils.verify_lte_data_and_tethering_supported(self.dut) req_params = ("wifi_network_with_dns_tls", "wifi_network_no_dns_tls", "ping_hosts") self.unpack_userparams(req_params) self.tcpdump_pid = None - self.tcpdump_file = None + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -68,9 +66,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): Args: 1. ad: dut to run tcpdump on """ - if self.tcpdump_pid: - stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=False) - self.tcpdump_pid = start_adb_tcpdump(ad, self.test_name, mask='all') + self.tcpdump_pid = start_tcpdump(ad, self.test_name) def _stop_tcp_dump(self, ad): """ Stop tcpdump and pull it to the test run logs @@ -78,13 +74,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): Args: 1. ad: dut to pull tcpdump from """ - file_name = ad.adb.shell("ls /sdcard/tcpdump") - file_name = os.path.join(ad.log_path, "TCPDUMP_%s" % ad.serial, - file_name.split('/')[-1]) - if self.tcpdump_pid: - stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=True) - self.tcpdump_pid = None - return os.path.join(ad.log_path, file_name) + return stop_tcpdump(ad, self.tcpdump_pid, self.test_name) def _verify_dns_queries_over_tls(self, pcap_file, tls=True): """ Verify if DNS queries were over TLS or not @@ -136,6 +126,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): mode = self.dut.droid.getPrivateDnsMode() asserts.assert_true(mode == dns_mode, "Failed to set private DNS mode to %s" % dns_mode) + time.sleep(2) # ping hosts should pass for host in self.ping_hosts: @@ -296,8 +287,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): # DNS server in link properties for wifi network link_prop = self.dut.droid.connectivityGetActiveLinkProperties() - dns_servers = link_prop['DnsServers'] - wifi_dns_servers = [each for lst in dns_servers for each in lst] + wifi_dns_servers = link_prop['PrivateDnsServerName'] self.log.info("Link prop: %s" % wifi_dns_servers) # DUT is on LTE data @@ -308,8 +298,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): # DNS server in link properties for cell network link_prop = self.dut.droid.connectivityGetActiveLinkProperties() - dns_servers = link_prop['DnsServers'] - lte_dns_servers = [each for lst in dns_servers for each in lst] + lte_dns_servers = link_prop['PrivateDnsServerName'] self.log.info("Link prop: %s" % lte_dns_servers) # stop tcpdump on device @@ -323,7 +312,7 @@ class DnsOverTlsTest(base_test.BaseTestClass): "Hostname not in link properites - cell network") @test_tracker_info(uuid="525a6f2d-9751-474e-a004-52441091e427") - def test_dns_over_tls_no_reset_packets(self): + def dns_over_tls_no_reset_packets(self): """ Verify there are no TCP packets with RST flags Steps: diff --git a/acts/tests/google/net/IpSecTest.py b/acts/tests/google/net/IpSecTest.py index dc7fd42c89..41ae0bc978 100644 --- a/acts/tests/google/net/IpSecTest.py +++ b/acts/tests/google/net/IpSecTest.py @@ -20,8 +20,8 @@ from acts.test_decorators import test_tracker_info from acts.test_utils.net import connectivity_const as cconst from acts.test_utils.net import ipsec_test_utils as iutils from acts.test_utils.net import socket_test_utils as sutils -from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump -from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump from acts.test_utils.wifi import wifi_test_utils as wutils import random @@ -47,32 +47,22 @@ class IpSecTest(base_test.BaseTestClass): self.ipv6_dut_a = self.dut_a.droid.connectivityGetIPv6Addresses(WLAN)[0] self.ipv6_dut_b = self.dut_b.droid.connectivityGetIPv6Addresses(WLAN)[0] + self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo() + self.tcpdump_pid_a = None - self.tcpdump_file_a = None self.tcpdump_pid_b = None - self.tcpdump_file_b = None - - self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo() def teardown_class(self): for ad in self.android_devices: wutils.reset_wifi(ad) def setup_test(self): - self.tcpdump_pid_a = start_adb_tcpdump( - self.dut_a, self.test_name, mask='all') - self.tcpdump_pid_b = start_adb_tcpdump( - self.dut_b, self.test_name, mask='all') + self.tcpdump_pid_a = start_tcpdump(self.dut_a, self.test_name) + self.tcpdump_pid_b = start_tcpdump(self.dut_b, self.test_name) def teardown_test(self): - if self.tcpdump_pid_a: - stop_adb_tcpdump( - self.dut_a, self.tcpdump_pid_a, pull_tcpdump=True) - if self.tcpdump_pid_b: - stop_adb_tcpdump( - self.dut_b, self.tcpdump_pid_b, pull_tcpdump=True) - self.tcpdump_pid_a = None - self.tcpdump_pid_b = None + stop_tcpdump(self.dut_a, self.tcpdump_pid_a, self.test_name) + stop_tcpdump(self.dut_b, self.tcpdump_pid_b, self.test_name) def on_fail(self, test_name, begin_time): self.dut_a.take_bug_report(test_name, begin_time) diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py index 786a40a822..d1dd385cce 100644 --- a/acts/tests/google/net/LegacyVpnTest.py +++ b/acts/tests/google/net/LegacyVpnTest.py @@ -25,7 +25,8 @@ from acts import base_test from acts import test_runner from acts.controllers import adb from acts.test_decorators import test_tracker_info -from acts.test_utils.wifi import wifi_test_utils +from acts.test_utils.net import net_test_utils as nutils +from acts.test_utils.wifi import wifi_test_utils as wutils from acts.test_utils.net import connectivity_const VPN_CONST = connectivity_const.VpnProfile @@ -48,140 +49,36 @@ class LegacyVpnTest(base_test.BaseTestClass): required_params = dir(VPN_PARAMS) required_params = [x for x in required_params if not x.startswith('__')] self.unpack_userparams(required_params) - wifi_test_utils.wifi_test_device_init(self.dut) - wifi_test_utils.wifi_connect(self.dut, self.wifi_network) + wutils.wifi_test_device_init(self.dut) + wutils.wifi_connect(self.dut, self.wifi_network) time.sleep(3) + self.vpn_params = {'vpn_username': self.vpn_username, + 'vpn_password': self.vpn_password, + 'psk_secret': self.psk_secret, + 'client_pkcs_file_name': self.client_pkcs_file_name, + 'cert_path_vpnserver': self.cert_path_vpnserver, + 'cert_password': self.cert_password} + def teardown_class(self): """ Reset wifi to make sure VPN tears down cleanly """ - wifi_test_utils.reset_wifi(self.dut) + wutils.reset_wifi(self.dut) def on_fail(self, test_name, begin_time): self.dut.take_bug_report(test_name, begin_time) - def download_load_certs(self, vpn_type, vpn_server_addr, ipsec_server_type): - """ Download the certificates from VPN server and push to sdcard of DUT - - Args: - VPN type name - - Returns: - Client cert file name on DUT's sdcard - """ - url = "http://%s%s%s" % (vpn_server_addr, - self.cert_path_vpnserver, - self.client_pkcs_file_name) - local_cert_name = "%s_%s_%s" % (vpn_type.name, - ipsec_server_type, - self.client_pkcs_file_name) - local_file_path = os.path.join(self.log_path, local_cert_name) - try: - ret = urllib.request.urlopen(url) - with open(local_file_path, "wb") as f: - f.write(ret.read()) - except: - asserts.fail("Unable to download certificate from the server") - f.close() - self.dut.adb.push("%s sdcard/" % local_file_path) - return local_cert_name - - def generate_legacy_vpn_profile(self, vpn_type, vpn_server_addr, ipsec_server_type): - """ Generate legacy VPN profile for a VPN - - Args: - VpnProfileType - """ - vpn_profile = {VPN_CONST.USER: self.vpn_username, - VPN_CONST.PWD: self.vpn_password, - VPN_CONST.TYPE: vpn_type.value, - VPN_CONST.SERVER: vpn_server_addr,} - vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,ipsec_server_type) - if vpn_type.name == "PPTP": - vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name - psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"]) - rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"]) - if vpn_type.name in psk_set: - vpn_profile[VPN_CONST.IPSEC_SECRET] = self.psk_secret - elif vpn_type.name in rsa_set: - cert_name = self.download_load_certs(vpn_type, - vpn_server_addr, - ipsec_server_type) - vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0] - vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0] - self.dut.droid.installCertificate(vpn_profile,cert_name,self.cert_password) - else: - vpn_profile[VPN_CONST.MPPE] = self.pptp_mppe - return vpn_profile - - def verify_ping_to_vpn_ip(self, connected_vpn_info): - """ Verify if IP behind VPN server is pingable - Ping should pass, if VPN is connected - Ping should fail, if VPN is disconnected - - Args: - connected_vpn_info which specifies the VPN connection status - """ - ping_result = None - pkt_loss = "100% packet loss" - try: - ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s" - % self.vpn_verify_address) - except adb.AdbError: - pass - return ping_result and pkt_loss not in ping_result - - def legacy_vpn_connection_test_logic(self, vpn_profile): - """ Test logic for each legacy VPN connection - - Steps: - 1. Generate profile for the VPN type - 2. Establish connection to the server - 3. Verify that connection is established using LegacyVpnInfo - 4. Verify the connection by pinging the IP behind VPN - 5. Stop the VPN connection - 6. Check the connection status - 7. Verify that ping to IP behind VPN fails - - Args: - VpnProfileType (1 of the 6 types supported by Android) - """ - # Wait for sometime so that VPN server flushes all interfaces and - # connections after graceful termination - time.sleep(10) - self.dut.adb.shell("ip xfrm state flush") - logging.info("Connecting to: %s", vpn_profile) - self.dut.droid.vpnStartLegacyVpn(vpn_profile) - time.sleep(connectivity_const.VPN_TIMEOUT) - connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo() - asserts.assert_equal(connected_vpn_info["state"], - connectivity_const.VPN_STATE_CONNECTED, - "Unable to establish VPN connection for %s" - % vpn_profile) - ping_result = self.verify_ping_to_vpn_ip(connected_vpn_info) - ip_xfrm_state = self.dut.adb.shell("ip xfrm state") - match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state) - if match_obj: - ip_xfrm_state = format(match_obj.group(0)).split() - self.log.info("HMAC for ESP is %s " % ip_xfrm_state[0]) - self.dut.droid.vpnStopLegacyVpn() - asserts.assert_true(ping_result, - "Ping to the internal IP failed. " - "Expected to pass as VPN is connected") - connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo() - asserts.assert_true(not connected_vpn_info, - "Unable to terminate VPN connection for %s" - % vpn_profile) - """ Test Cases """ @test_tracker_info(uuid="d2ac5a65-41fb-48de-a0a9-37e589b5456b") def test_legacy_vpn_pptp(self): """ Verify PPTP VPN connection """ vpn = VPN_TYPE.PPTP - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="99af78dd-40b8-483a-8344-cd8f67594971") def legacy_vpn_l2tp_ipsec_psk_libreswan(self): @@ -189,10 +86,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="e67d8c38-92c3-4167-8b6c-a49ef939adce") def legacy_vpn_l2tp_ipsec_rsa_libreswan(self): @@ -200,10 +99,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="8b3517dc-6a3b-44c2-a85d-bd7b969df3cf") def legacy_vpn_ipsec_xauth_psk_libreswan(self): @@ -211,10 +112,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="abac663d-1d91-4b87-8e94-11c6e44fb07b") def legacy_vpn_ipsec_xauth_rsa_libreswan(self): @@ -222,10 +125,12 @@ class LegacyVpnTest(base_test.BaseTestClass): libreSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][2], - self.ipsec_server_type[2]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[2], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="84140d24-53c0-4f6c-866f-9d66e04442cc") def test_legacy_vpn_l2tp_ipsec_psk_openswan(self): @@ -233,10 +138,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="f7087592-7eed-465d-bfe3-ed7b6d9d5f9a") def test_legacy_vpn_l2tp_ipsec_rsa_openswan(self): @@ -244,10 +151,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="ed78973b-13ee-4dd4-b998-693ab741c6f8") def test_legacy_vpn_ipsec_xauth_psk_openswan(self): @@ -255,10 +164,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="cfd125c4-b64c-4c49-b8e4-fbf05a9be8ec") def test_legacy_vpn_ipsec_xauth_rsa_openswan(self): @@ -266,10 +177,12 @@ class LegacyVpnTest(base_test.BaseTestClass): openSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][1], - self.ipsec_server_type[1]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[1], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="419370de-0aa1-4a56-8c22-21567fa1cbb7") def test_legacy_vpn_l2tp_ipsec_psk_strongswan(self): @@ -277,10 +190,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="f7694081-8bd6-4e31-86ec-d538c4ff1f2e") def test_legacy_vpn_l2tp_ipsec_rsa_strongswan(self): @@ -288,10 +203,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.L2TP_IPSEC_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="2f86eb98-1e05-42cb-b6a6-fd90789b6cde") def test_legacy_vpn_ipsec_xauth_psk_strongswan(self): @@ -299,10 +216,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongSwan server """ vpn = VPN_TYPE.IPSEC_XAUTH_PSK - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="af0cd7b1-e86c-4327-91b4-e9062758f2cf") def test_legacy_vpn_ipsec_xauth_rsa_strongswan(self): @@ -310,10 +229,12 @@ class LegacyVpnTest(base_test.BaseTestClass): strongswan server """ vpn = VPN_TYPE.IPSEC_XAUTH_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) @test_tracker_info(uuid="7b970d0a-1c7d-4a5a-b406-4815e190ef26") def test_legacy_vpn_ipsec_hybrid_rsa_strongswan(self): @@ -321,7 +242,9 @@ class LegacyVpnTest(base_test.BaseTestClass): strongswan server """ vpn = VPN_TYPE.IPSEC_HYBRID_RSA - vpn_profile = self.generate_legacy_vpn_profile( + vpn_profile = nutils.generate_legacy_vpn_profile( + self.dut, self.vpn_params, vpn, self.vpn_server_addresses[vpn.name][0], - self.ipsec_server_type[0]) - self.legacy_vpn_connection_test_logic(vpn_profile) + self.ipsec_server_type[0], + self.log_path) + nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile) diff --git a/acts/tests/google/net/NattKeepAliveTest.py b/acts/tests/google/net/NattKeepAliveTest.py new file mode 100644 index 0000000000..aaf1b1b141 --- /dev/null +++ b/acts/tests/google/net/NattKeepAliveTest.py @@ -0,0 +1,151 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts import base_test +from acts import utils +from acts.controllers import adb +from acts.test_decorators import test_tracker_info +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump +from acts.test_utils.net import connectivity_test_utils as cutils +from acts.test_utils.wifi import wifi_test_utils as wutils + +import random +import time + +WLAN = "wlan0" +PKTS = 5 +SERVER_UDP_KEEPALIVE = "python /root/udp_nat_keepalive.py" +KEEPALIVE_DATA = "ff" + + +class NattKeepAliveTest(base_test.BaseTestClass): + """ Tests for NATT keepalive """ + + def setup_class(self): + """ Setup devices for tests and unpack params """ + + self.dut = self.android_devices[0] + req_params = ("wifi_network", "remote_server", "server_ssh_config") + self.unpack_userparams(req_params) + + wutils.wifi_connect(self.dut, self.wifi_network) + + self.ip_a = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0] + self.ip_b = self.remote_server + self.log.info("DUT IP addr: %s" % self.ip_a) + self.log.info("Remote server IP addr: %s" % self.ip_b) + self.tcpdump_pid = None + + def teardown_class(self): + wutils.reset_wifi(self.dut) + + def setup_test(self): + self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) + + def teardown_test(self): + stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + + """ Helper functions """ + + def _verify_time_interval(self, time_interval, cmd_out): + """ Verify time diff between packets is equal to time interval """ + + self.log.info("Packet Info: \n%s\n" % cmd_out) + pkts = cmd_out.rstrip().decode("utf-8", "ignore").split("\n") + + prev = 0 + for i in range(len(pkts)): + interval, data = pkts[i][1:-1].split(',') + + # verify data + if data.lstrip().rstrip()[1:-1] != KEEPALIVE_DATA: + self.log.error("Server received invalid data %s" % data) + return False + + # verify time interval + curr = float(interval) + if i == 0: + prev = curr + continue + diff = int(round(curr-prev)) + if diff != time_interval: + self.log.error("Keepalive time interval is %s, expected %s" + % (diff, time_interval)) + return False + prev = curr + + return True + + """ Tests begin """ + + @test_tracker_info(uuid="c9012da2-656f-44ef-bad6-26892335d4bd") + def test_natt_keepalive_ipv4(self): + """ Test natt keepalive over wifi + + Steps: + 1. Open a UDP port 4500 on linux host + 2. Start NATT keepalive packets on DUT and send 5 packets + 3. Verify that 5 keepalive packets reached host with data '0xff' + """ + + # set a time interval + result = True + time_interval = random.randint(10, 60) + port = random.randint(8000, 9000) + self.log.info("NATT keepalive time interval is %s" % time_interval) + self.log.info("Source port is %s" % port) + time_out = time_interval * PKTS + 6 + + # start NATT keep alive + nka_key = cutils.start_natt_keepalive( + self.dut, self.ip_a, port, self.ip_b, time_interval) + asserts.assert_true(nka_key, "Failed to start NATT keepalive") + + # capture packets on server + self.log.info("Capturing keepalive packets on %s" % self.ip_b) + cmd_out = utils.exe_cmd("timeout %s %s" % + (time_out, SERVER_UDP_KEEPALIVE)) + + # verify packets received + result = self._verify_time_interval(time_interval, cmd_out) + + # stop NATT keep alive + status = cutils.stop_natt_keepalive(self.dut, nka_key) + asserts.assert_true(status, "Failed to stop NATT keepalive") + + return result + + @test_tracker_info(uuid="8ab20733-4a9e-4e4d-a46f-4d32a9f221c5") + def test_natt_keepalive_ipv4_invalid_interval(self): + """ Test invalid natt keepalive time interval + + Steps: + 1. Start NATT keepalive with time interval less than 10 seconds + 2. API should return invalid interval + """ + + # start NATT keep alive + port = random.randint(8000, 9000) + nka_key = cutils.start_natt_keepalive( + self.dut, self.ip_a, port, self.ip_b, 2) + asserts.assert_true(not nka_key, + "Started NATT keepalive with invalid interval") + + """ Tests end """ diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py index 080f9e47c4..26be9b740c 100644 --- a/acts/tests/google/wifi/WifiTetheringTest.py +++ b/acts/tests/google/wifi/WifiTetheringTest.py @@ -31,6 +31,7 @@ from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G from acts.test_utils.net import socket_test_utils as sutils from acts.test_utils.net import arduino_test_utils as dutils +from acts.test_utils.net import net_test_utils as nutils from acts.test_utils.wifi import wifi_test_utils as wutils WAIT_TIME = 2 @@ -47,29 +48,19 @@ class WifiTetheringTest(base_test.BaseTestClass): self.unpack_userparams(req_params) self.new_ssid = "wifi_tethering_test2" - wutils.wifi_toggle_state(self.hotspot_device, False) - self.hotspot_device.droid.telephonyToggleDataConnection(True) - wait_for_cell_data_connection(self.log, self.hotspot_device, True) - asserts.assert_true( - verify_http_connection(self.log, self.hotspot_device), - "HTTP verification failed on cell data connection") - asserts.assert_true( - self.hotspot_device.droid.connectivityIsTetheringSupported(), - "Tethering is not supported for the provider") + nutils.verify_lte_data_and_tethering_supported(self.hotspot_device) for ad in self.tethered_devices: - ad.droid.telephonyToggleDataConnection(False) wutils.wifi_test_device_init(ad) def teardown_class(self): """ Reset devices """ - wutils.wifi_toggle_state(self.hotspot_device, True) for ad in self.tethered_devices: - ad.droid.telephonyToggleDataConnection(True) + wutils.reset_wifi(ad) def on_fail(self, test_name, begin_time): """ Collect bug report on failure """ - self.hotspot_device.take_bug_report(test_name, begin_time) - for ad in self.tethered_devices: + return + for ad in self.android_devices: ad.take_bug_report(test_name, begin_time) """ Helper functions """ @@ -200,7 +191,7 @@ class WifiTetheringTest(base_test.BaseTestClass): """ Connect disconnect tethered devices to wifi hotspot """ num_android_devices = len(self.tethered_devices) num_wifi_dongles = 0 - if self.arduino_wifi_dongles: + if hasattr(self, 'arduino_wifi_dongles'): num_wifi_dongles = len(self.arduino_wifi_dongles) total_devices = num_android_devices + num_wifi_dongles device_connected = [False] * total_devices @@ -389,6 +380,8 @@ class WifiTetheringTest(base_test.BaseTestClass): 2. Connect 2 tethered devices to the hotspot device 3. Ping interfaces between the tethered devices """ + asserts.skip_if(not hasattr(self, 'arduino_wifi_dongles'), + "No wifi dongles connected. Skipping test") wutils.toggle_wifi_off_and_on(self.hotspot_device) self._start_wifi_tethering(WIFI_CONFIG_APBAND_2G) self._test_traffic_between_two_tethered_devices(self.tethered_devices[0], -- cgit v1.2.3 From ea6adb1d44fdaf0a01179abaa646836a159129a6 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Mon, 23 Jul 2018 23:21:19 -0700 Subject: [LegacyVpnTest] Error fix for LegacyVpnTest CHERRY PICKED FROM AOSP Bug: 109670497 Test: Verified the changes Change-Id: Ifd1bf216599c6de7e60b22a4db93678e2c916b2a Merged-In: I0330799b03d6d7c44f4a72ccf818fe1f82706669 --- acts/framework/acts/test_utils/net/net_test_utils.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py index 79cc10094c..867473caef 100644 --- a/acts/framework/acts/test_utils/net/net_test_utils.py +++ b/acts/framework/acts/test_utils/net/net_test_utils.py @@ -153,8 +153,6 @@ def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr, local_cert_name = "%s_%s_%s" % (vpn_type.name, ipsec_server_type, vpn_params['client_pkcs_file_name']) - ad.adb.push("%s sdcard/" % local_cert_name) - return local_cert_name local_file_path = os.path.join(log_path, local_cert_name) try: -- cgit v1.2.3 From 1e1e20916cadda72470e12a9fcf852310232b848 Mon Sep 17 00:00:00 2001 From: Tim Date: Tue, 17 Jul 2018 13:24:40 -0700 Subject: Merge "Add wifi RVR TW test" am: 61562a0da0 am: 19160b1453 am: 4dcdf2f67c Bug: 0 Change-Id: Ief4cfc61866bf6825da057defc2233689c987026 (cherry picked from commit 19d7285ecdce6725d5265a8c0689e48d1ba97261) --- acts/tests/google/wifi/WifiRvrTwTest.py | 482 ++++++++++++++++++++++++++++++++ 1 file changed, 482 insertions(+) create mode 100644 acts/tests/google/wifi/WifiRvrTwTest.py diff --git a/acts/tests/google/wifi/WifiRvrTwTest.py b/acts/tests/google/wifi/WifiRvrTwTest.py new file mode 100644 index 0000000000..24afa5ea2d --- /dev/null +++ b/acts/tests/google/wifi/WifiRvrTwTest.py @@ -0,0 +1,482 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import pprint +import time + +import acts.signals +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +from acts.controllers import iperf_server as ipf + +import json +import logging +import math +import os +from acts import utils +import csv + +import serial +import sys + + +WifiEnums = wutils.WifiEnums + + +class WifiRvrTWTest(WifiBaseTest): + """ Tests for wifi RVR performance + + Test Bed Requirement: + * One Android device + * Wi-Fi networks visible to the device + """ + TEST_TIMEOUT = 10 + + def __init__(self, controllers): + self.attenuators = None + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + wutils.wifi_test_device_init(self.dut) + + req_params = [ "iot_networks","rvr_test_params"] + opt_params = [ "angle_params","usb_port"] + self.unpack_userparams(req_param_names=req_params, + opt_param_names=opt_params) + + asserts.assert_true( + len(self.iot_networks) > 0, + "Need at least one iot network with psk.") + + wutils.wifi_toggle_state(self.dut, True) + if "rvr_test_params" in self.user_params: + self.iperf_server = self.iperf_servers[0] + self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"] + self.MindB= self.rvr_test_params ["rvr_atten_MinDB"] + self.stepdB= self.rvr_test_params ["rvr_atten_step"] + + if "angle_params" in self.user_params: + self.angle = self.angle_params + + if "usb_port" in self.user_params: + self.T1=self.readport(self.usb_port["turntable"]) + self.ATT1=self.readport(self.usb_port["atten1"]) + self.ATT2=self.readport(self.usb_port["atten2"]) + self.ATT3=self.readport(self.usb_port["atten3"]) + + # create hashmap for testcase name and SSIDs + self.iot_test_prefix = "test_iot_connection_to_" + self.ssid_map = {} + for network in self.iot_networks: + SSID = network['SSID'].replace('-','_') + self.ssid_map[SSID] = network + + # create folder for rvr test result + self.log_path = os.path.join(logging.log_path, "rvr_results") + utils.create_dir(self.log_path) + + Header=("test_SSID","Turn table (angle)","Attenuator(dBm)", + "TX throughput (Mbps)","RX throughput (Mbps)", + "RSSI","Link speed","Frequency") + self.csv_write(Header) + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + + def teardown_test(self): + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + + def teardown_class(self): + if "rvr_test_params" in self.user_params: + self.iperf_server.stop() + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + self.dut.cat_adb_log(test_name, begin_time) + + """Helper Functions""" + + def csv_write(self,data): + """Output .CSV file for test result. + + Args: + data: Dict containing attenuation, throughput and other meta data. + """ + with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: + csv_writer = csv.writer(csv_file,delimiter=',') + csv_writer.writerow(data) + csv_file.close() + + def readport(self,com): + """Read com port for current test. + + Args: + com: Attenuator or turn table com port + """ + port=serial.Serial(com,9600,timeout=1) + time.sleep(1) + return port + + def getdB(self,port): + """Get attenuator dB for current test. + + Args: + port: Attenuator com port + """ + port.write('V?;'.encode()) + dB=port.readline().decode() + dB=dB.strip(';') + dB=dB[dB.find('V')+1:] + return int(dB) + + def setdB(self,port,dB): + """Setup attenuator dB for current test. + + Args: + port: Attenuator com port + dB: Attenuator setup dB + """ + if dB<0: + dB=0 + elif dB>101: + dB=101 + self.log.info("Set dB to "+str(dB)) + InputdB=str('V')+str(dB)+str(';') + port.write(InputdB.encode()) + time.sleep(0.1) + + def set_Three_Att_dB(self,port1,port2,port3,dB): + """Setup 3 attenuator dB for current test. + + Args: + port1: Attenuator1 com port + port1: Attenuator2 com port + port1: Attenuator com port + dB: Attenuator setup dB + """ + self.setdB(port1,dB) + self.setdB(port2,dB) + self.setdB(port3,dB) + self.checkdB(port1,dB) + self.checkdB(port2,dB) + self.checkdB(port3,dB) + + def checkdB(self,port,dB): + """Check attenuator dB for current test. + + Args: + port: Attenuator com port + dB: Attenuator setup dB + """ + retry=0 + while self.getdB(port)!=dB and retry<10: + retry=retry+1 + self.log.info("Current dB = "+str(self.getdB(port))) + self.log.info("Fail to set Attenuator to "+str(dB)+", " + +str(retry)+" times try to reset") + self.setdB(port,dB) + if retry ==10: + self.log.info("Retry Attenuator fail for 9 cycles, end test!") + sys.exit() + return 0 + + def getDG(self,port): + """Get turn table angle for current test. + + Args: + port: Turn table com port + """ + DG = "" + port.write('DG?;'.encode()) + time.sleep(0.1) + data = port.readline().decode('utf-8') + for i in range(len(data)): + if (data[i].isdigit()) == True: + DG = DG + data[i] + if DG == "": + return -1 + return int(DG) + + def setDG(self,port,DG): + """Setup turn table angle for current test. + + Args: + port: Turn table com port + DG: Turn table setup angle + """ + if DG>359: + DG=359 + elif DG<0: + DG=0 + self.log.info("Set angle to "+str(DG)) + InputDG=str('DG')+str(DG)+str(';') + port.write(InputDG.encode()) + + def checkDG(self,port,DG): + """Check turn table angle for current test. + + Args: + port: Turn table com port + DG: Turn table setup angle + """ + retrytime = self.TEST_TIMEOUT + retry = 0 + while self.getDG(port)!=DG and retry Date: Tue, 17 Jul 2018 13:12:49 -0700 Subject: Merge "Add wifi_iot_tw test package1" am: ec63061434 am: 8faa8f58d3 am: 638c6e2397 Bug: 0 Change-Id: I38b70775f299b15d932fc6c4ff8d07ff8caebabf (cherry picked from commit 970fa916a10c06e1191dcc310b5cef5354ad1186) --- acts/tests/google/wifi/WifiIOTTwPkg1Test.py | 364 ++++++++++++++++++++++++++++ 1 file changed, 364 insertions(+) create mode 100644 acts/tests/google/wifi/WifiIOTTwPkg1Test.py diff --git a/acts/tests/google/wifi/WifiIOTTwPkg1Test.py b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py new file mode 100644 index 0000000000..b30c606b79 --- /dev/null +++ b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py @@ -0,0 +1,364 @@ +#!/usr/bin/env python3 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import itertools +import pprint +import time + +import acts.signals +import acts.test_utils.wifi.wifi_test_utils as wutils + +from acts import asserts +from acts.test_decorators import test_tracker_info +from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest +from acts.controllers import iperf_server as ipf + +import json +import logging +import math +import os +from acts import utils +import csv + +WifiEnums = wutils.WifiEnums + + +class WifiIOTTwPkg1Test(WifiBaseTest): + """ Tests for wifi IOT + + Test Bed Requirement: + * One Android device + * Wi-Fi IOT networks visible to the device + """ + + def __init__(self, controllers): + self.attenuators = None + WifiBaseTest.__init__(self, controllers) + + def setup_class(self): + self.dut = self.android_devices[0] + wutils.wifi_test_device_init(self.dut) + + req_params = [ "iot_networks", ] + opt_params = [ "open_network", + "iperf_server_address","iperf_port_arg", + "pdu_address" , "pduon_wait_time","pduon_address" + ] + self.unpack_userparams(req_param_names=req_params, + opt_param_names=opt_params) + + asserts.assert_true( + len(self.iot_networks) > 0, + "Need at least one iot network with psk.") + + if getattr(self, 'open_network', False): + self.iot_networks.append(self.open_network) + + wutils.wifi_toggle_state(self.dut, True) + if "iperf_server_address" in self.user_params: + self.iperf_server = self.iperf_servers[0] + + # create hashmap for testcase name and SSIDs + self.iot_test_prefix = "test_iot_connection_to_" + self.ssid_map = {} + for network in self.iot_networks: + SSID = network['SSID'].replace('-','_') + self.ssid_map[SSID] = network + + # create folder for IOT test result + self.log_path = os.path.join(logging.log_path, "IOT_results") + utils.create_dir(self.log_path) + + Header=("test_name","throughput_TX","throughput_RX") + self.csv_write(Header) + + # check pdu_address + if "pdu_address" and "pduon_wait_time" in self.user_params: + self.pdu_func() + + def setup_test(self): + self.dut.droid.wakeLockAcquireBright() + self.dut.droid.wakeUpNow() + + def teardown_test(self): + self.dut.droid.wakeLockRelease() + self.dut.droid.goToSleepNow() + wutils.reset_wifi(self.dut) + + def teardown_class(self): + if "iperf_server_address" in self.user_params: + self.iperf_server.stop() + + def on_fail(self, test_name, begin_time): + self.dut.take_bug_report(test_name, begin_time) + self.dut.cat_adb_log(test_name, begin_time) + + """Helper Functions""" + + def connect_to_wifi_network(self, network): + """Connection logic for open and psk wifi networks. + + Args: + params: Dictionary with network info. + """ + SSID = network[WifiEnums.SSID_KEY] + self.dut.ed.clear_all_events() + wutils.start_wifi_connection_scan(self.dut) + scan_results = self.dut.droid.wifiGetScanResults() + wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results) + wutils.wifi_connect(self.dut, network, num_of_tries=3) + + def run_iperf_client(self, network): + """Run iperf TX throughput after connection. + + Args: + params: Dictionary with network info. + """ + if "iperf_server_address" in self.user_params: + + # Add iot_result + iot_result = [] + self.iperf_server.start(tag="TX_server_{}".format( + self.current_test_name)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic TX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J {}".format(self.iperf_server.port,self.iperf_port_arg) + success, data = self.dut.run_iperf_client(self.iperf_server_address, + port_arg) + # Parse and log result + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},TX_client_{}".format( + self.iperf_server.port,self.current_test_name)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = self.iperf_server.log_files[-1] + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = math.fsum(iperf_result.instantaneous_rates) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + iot_result.append(curr_throughput) + self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return iot_result + + def run_iperf_server(self, network): + """Run iperf RX throughput after connection. + + Args: + params: Dictionary with network info. + + Returns: + iot_result: dict containing iot_results + """ + if "iperf_server_address" in self.user_params: + + # Add iot_result + iot_result = [] + self.iperf_server.start(tag="RX_client_{}".format( + self.current_test_name)) + wait_time = 5 + SSID = network[WifiEnums.SSID_KEY] + self.log.info("Starting iperf traffic RX through {}".format(SSID)) + time.sleep(wait_time) + port_arg = "-p {} -J -R {}".format(self.iperf_server.port,self.iperf_port_arg) + success, data = self.dut.run_iperf_client(self.iperf_server_address, + port_arg) + client_output_path = os.path.join( + self.iperf_server.log_path, "IperfDUT,{},RX_server_{}".format( + self.iperf_server.port,self.current_test_name)) + with open(client_output_path, 'w') as out_file: + out_file.write("\n".join(data)) + self.iperf_server.stop() + + iperf_file = client_output_path + try: + iperf_result = ipf.IPerfResult(iperf_file) + curr_throughput = math.fsum(iperf_result.instantaneous_rates) + except: + self.log.warning( + "ValueError: Cannot get iperf result. Setting to 0") + curr_throughput = 0 + iot_result.append(curr_throughput) + self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput)) + + self.log.debug(pprint.pformat(data)) + asserts.assert_true(success, "Error occurred in iPerf traffic.") + return iot_result + + def iperf_test_func(self,network): + """Main function to test iperf TX/RX. + + Args: + params: Dictionary with network info + """ + # Initialize + iot_result = {} + + # Run RvR and log result + iot_result["throughput_TX"] = self.run_iperf_client(network) + iot_result["throughput_RX"] = self.run_iperf_server(network) + iot_result["test_name"] = self.current_test_name + + # Save output as text file + results_file_path = "{}/{}.json".format(self.log_path, + self.current_test_name) + with open(results_file_path, 'w') as results_file: + json.dump(iot_result, results_file, indent=4) + + data=(iot_result["test_name"],iot_result["throughput_TX"][0], + iot_result["throughput_RX"][0]) + self.csv_write(data) + + def csv_write(self,data): + with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file: + csv_writer = csv.writer(csv_file,delimiter=',') + csv_writer.writerow(data) + csv_file.close() + + def pdu_func(self): + """control Power Distribution Units on local machine. + + Logic steps are + 1. Turn off PDU for all port. + 2. Turn on PDU for specified port. + """ + out_file_name = "PDU.log" + self.full_out_path = os.path.join(self.log_path, out_file_name) + cmd = "curl http://snmp:1234@{}/offs.cgi?led=11111111> {}".format(self.pdu_address, + self.full_out_path) + self.pdu_process = utils.start_standing_subprocess(cmd) + wait_time = 10 + self.log.info("Starting set PDU to OFF") + time.sleep(wait_time) + self.full_out_path = os.path.join(self.log_path, out_file_name) + cmd = "curl http://snmp:1234@{}/ons.cgi?led={}> {}".format(self.pdu_address, + self.pduon_address, + self.full_out_path) + self.pdu_process = utils.start_standing_subprocess(cmd) + wait_time = int("{}".format(self.pduon_wait_time)) + self.log.info("Starting set PDU to ON for port1," + "wait for {}s".format(self.pduon_wait_time)) + time.sleep(wait_time) + self.log.info("PDU setup complete") + + def connect_to_wifi_network_and_run_iperf(self, network): + """Connection logic for open and psk wifi networks. + + Logic steps are + 1. Connect to the network. + 2. Run iperf throghput. + + Args: + params: A dictionary with network info. + """ + self.connect_to_wifi_network(network) + self.iperf_test_func(network) + + """Tests""" + + @test_tracker_info(uuid="0e4ad6ed-595c-4629-a4c9-c6be9c3c58e0") + def test_iot_connection_to_ASUS_RT_AC68U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="a76d8acc-808e-4a5d-a52b-5ba07d07b810") + def test_iot_connection_to_ASUS_RT_AC68U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="659a3e5e-07eb-4905-9cda-92e959c7b674") + def test_iot_connection_to_D_Link_DIR_868L_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="6bcfd736-30fc-48a8-b4fb-723d1d113f3c") + def test_iot_connection_to_D_Link_DIR_868L_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="c9da945a-2c4a-44e1-881d-adf307b39b21") + def test_iot_connection_to_TP_LINK_WR940N_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="db0d224d-df81-401f-bf35-08ad02e41a71") + def test_iot_connection_to_ASUS_RT_N66U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="845ff1d6-618d-40f3-81c3-6ed3a0751fde") + def test_iot_connection_to_ASUS_RT_N66U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="6908039b-ccc9-4777-a0f1-3494ce642014") + def test_iot_connection_to_ASUS_RT_AC54U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="2647c15f-2aad-47d7-8dee-b2ee1ac4cef6") + def test_iot_connection_to_ASUS_RT_AC54U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="99678f66-ddf1-454d-87e4-e55177ec380d") + def test_iot_connection_to_ASUS_RT_N56U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="4dd75e81-9a8e-44fd-9449-09f5ab8a63c3") + def test_iot_connection_to_ASUS_RT_N56U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="315397ce-50d5-4abf-a11c-1abcaef832d3") + def test_iot_connection_to_BELKIN_F9K1002v1_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="05ba464a-b1ef-4ac1-a32f-c919ec4aa1dd") + def test_iot_connection_to_CISCO_E1200_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="04912868-4a47-40ce-877e-4e4c89849557") + def test_iot_connection_to_TP_LINK_C2_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="53517a21-3802-4185-b8bb-6eaace063a42") + def test_iot_connection_to_TP_LINK_C2_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="71c08c1c-415d-4da4-a151-feef43fb6ad8") + def test_iot_connection_to_ASUS_RT_AC66U_2G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) + + @test_tracker_info(uuid="2322c155-07d1-47c9-bd21-2e358e3df6ee") + def test_iot_connection_to_ASUS_RT_AC66U_5G(self): + ssid_key = self.current_test_name.replace(self.iot_test_prefix, "") + self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key]) -- cgit v1.2.3 From d279fcdf668bd74d7d84a98c2226921280709ee6 Mon Sep 17 00:00:00 2001 From: Bindu Mahadev Date: Tue, 24 Jul 2018 16:46:08 -0700 Subject: [Tests]Add stress test to play youtube video in loop over wifi. Also fix test run times for few tests and add more logging. Bug: 111653976 Bug: 111604990 Test: Local Change-Id: I745bc1954002c0a75c346e52ebd52af7ca148716 (cherry picked from commit 3585b10f621ab0abf8f550edd4d9a10d07cb40b0) --- acts/tests/google/wifi/WifiStressTest.py | 70 ++++++++++++++++++++++++++++++-- 1 file changed, 66 insertions(+), 4 deletions(-) diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py index 3385d90759..e99aed609f 100755 --- a/acts/tests/google/wifi/WifiStressTest.py +++ b/acts/tests/google/wifi/WifiStressTest.py @@ -19,7 +19,7 @@ import time import acts.base_test import acts.test_utils.wifi.wifi_test_utils as wutils -import acts.utils +import acts.test_utils.tel.tel_test_utils as tutils from acts import asserts from acts import signals @@ -134,6 +134,31 @@ class WifiStressTest(WifiBaseTest): if "100% packet loss" in result: raise signals.TestFailure("100% packet loss during ping") + def start_youtube_video(self, url=None, secs=60): + """Start a youtube video and check if it's playing. + + Args: + url: The URL of the youtube video to play. + secs: Time to play video in seconds. + + """ + ad = self.dut + ad.log.info("Start a youtube video") + ad.ensure_screen_on() + video_played = False + for count in range(2): + ad.unlock_screen() + ad.adb.shell('am start -a android.intent.action.VIEW -d "%s"' % url) + if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1): + ad.log.info("Started a video in youtube.") + # Play video for given seconds. + time.sleep(secs) + video_played = True + break + if not video_played: + raise signals.TestFailure("Youtube video did not start. Current WiFi " + "state is %d" % self.dut.droid.wifiCheckState()) + """Tests""" @test_tracker_info(uuid="cd0016c6-58cf-4361-b551-821c0b8d2554") @@ -206,17 +231,54 @@ class WifiStressTest(WifiBaseTest): sec = self.stress_hours * 60 * 60 args = "-p {} -t {} -R".format(self.iperf_server.port, sec) self.log.info("Running iperf client {}".format(args)) + start_time = time.time() result, data = self.dut.run_iperf_client(self.iperf_server_address, args, timeout=sec+1) if not result: self.log.debug("Error occurred in iPerf traffic.") + start_time = time.time() self.run_ping(sec) except: + total_time = time.time() - start_time raise signals.TestFailure("Network long-connect failed." - "Look at logs", extras={"Total Hours":"%d" %self.stress_hours, - "Seconds Run":"UNKNOWN"}) + "WiFi State = %d" %self.dut.droid.wifiCheckState(), + extras={"Total Hours":"%d" %self.stress_hours, + "Seconds Run":"%d" %total_time}) + total_time = time.time() - start_time + self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState()) raise signals.TestPass(details="", extras={"Total Hours":"%d" % - self.stress_hours, "Seconds":"%d" %sec}) + self.stress_hours, "Seconds Run":"%d" %total_time}) + + def test_stress_youtube_5g(self): + """Test to connect to network and play various youtube videos. + + Steps: + 1. Scan and connect to a network. + 2. Loop through and play a list of youtube videos. + 3. Verify no WiFi disconnects/data interruption. + + """ + # List of Youtube 4K videos. + videos = ["https://www.youtube.com/watch?v=TKmGU77INaM", + "https://www.youtube.com/watch?v=WNCl-69POro", + "https://www.youtube.com/watch?v=dVkK36KOcqs", + "https://www.youtube.com/watch?v=0wCC3aLXdOw", + "https://www.youtube.com/watch?v=rN6nlNC9WQA", + "https://www.youtube.com/watch?v=U--7hxRNPvk"] + try: + self.scan_and_connect_by_ssid(self.wpa_5g) + start_time = time.time() + for video in videos: + self.start_youtube_video(url=video, secs=10*60) + except: + total_time = time.time() - start_time + raise signals.TestFailure("The youtube stress test has failed." + "WiFi State = %d" %self.dut.droid.wifiCheckState(), + extras={"Total Hours":"1", "Seconds Run":"%d" %total_time}) + total_time = time.time() - start_time + self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState()) + raise signals.TestPass(details="", extras={"Total Hours":"1", + "Seconds Run":"%d" %total_time}) @test_tracker_info(uuid="d367c83e-5b00-4028-9ed8-f7b875997d13") def test_stress_wifi_failover(self): -- cgit v1.2.3 From 9b1de53752f3cf6b1c2f7178a5d9328d168c7d28 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Fri, 27 Jul 2018 10:53:38 -0700 Subject: [WifiTethering_OTATest] Add UUIDs to the tests Bug: 109876567 Test: Verified the changes Change-Id: Idc8eab275270c394a9b5ea11c6987716900dfd46 Merged-In: Ic314d4397b8cef059d1944514003bcbca82d6d36 --- acts/tests/google/wifi/WifiTethering2GOpenOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering2GPskOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering5GOpenOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering5GPskOTATest.py | 8 ++++++++ 4 files changed, 32 insertions(+) diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py index 68f039980e..a603e017fa 100755 --- a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py +++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py @@ -73,5 +73,13 @@ class WifiTethering2GOpenOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="fe502bc3-f9c6-4bed-98ad-acaa7e166521") def test_wifi_tethering_ota_2g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py index e1904e7c89..e9fedcd24c 100755 --- a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py +++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py @@ -73,5 +73,13 @@ class WifiTethering2GPskOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="4b1cec63-d1d2-4046-84e9-e806bb08ce41") def test_wifi_tethering_ota_2g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py index 6084500df0..6648d0e461 100755 --- a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py +++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py @@ -73,5 +73,13 @@ class WifiTethering5GOpenOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="b1a94c8f-f3ed-4755-be4a-764e205b4483") def test_wifi_tethering_ota_5g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py index 5cb532ac6c..74505787ff 100755 --- a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py +++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py @@ -73,5 +73,13 @@ class WifiTethering5GPskOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="111d3a33-3152-4993-b1ba-307daaf2a6ff") def test_wifi_tethering_ota_5g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() -- cgit v1.2.3 From ff33ef895f183f03e014719070fa6690203df941 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Fri, 27 Jul 2018 10:53:38 -0700 Subject: [WifiTethering_OTATest] Add UUIDs to the tests Bug: 109876567 Test: Verified the changes Change-Id: Ic424b4c61d32d24aa120c941ab523e3614c7c130 Merged-In: Ic314d4397b8cef059d1944514003bcbca82d6d36 --- acts/tests/google/wifi/WifiTethering2GOpenOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering2GPskOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering5GOpenOTATest.py | 8 ++++++++ acts/tests/google/wifi/WifiTethering5GPskOTATest.py | 8 ++++++++ 4 files changed, 32 insertions(+) diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py index 68f039980e..a603e017fa 100755 --- a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py +++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py @@ -73,5 +73,13 @@ class WifiTethering2GOpenOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="fe502bc3-f9c6-4bed-98ad-acaa7e166521") def test_wifi_tethering_ota_2g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py index e1904e7c89..e9fedcd24c 100755 --- a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py +++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py @@ -73,5 +73,13 @@ class WifiTethering2GPskOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="4b1cec63-d1d2-4046-84e9-e806bb08ce41") def test_wifi_tethering_ota_2g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 2g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py index 6084500df0..6648d0e461 100755 --- a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py +++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py @@ -73,5 +73,13 @@ class WifiTethering5GOpenOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="b1a94c8f-f3ed-4755-be4a-764e205b4483") def test_wifi_tethering_ota_5g_open(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and open auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py index 5cb532ac6c..74505787ff 100755 --- a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py +++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py @@ -73,5 +73,13 @@ class WifiTethering5GPskOTATest(BaseTestClass): """Tests""" + @test_tracker_info(uuid="111d3a33-3152-4993-b1ba-307daaf2a6ff") def test_wifi_tethering_ota_5g_psk(self): + """ Verify wifi hotspot after ota upgrade + + Steps: + 1. Save a wifi hotspot config with 5g band and psk auth + 2. Do a OTA update + 3. Verify that wifi hotspot works with teh saved config + """ self._verify_wifi_tethering() -- cgit v1.2.3 From e54f0d23f5d685fdca10eca12fe36665808bc964 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Fri, 27 Jul 2018 15:45:41 -0700 Subject: [ProxyTest] Add tests for proxy CHERRY PICKED FROM AOSP Bug: 111309129 Test: Verified the changes Change-Id: I2b9980df7e61a20a2d4a7aa11e8624c79211f9de Merged-In: I1f507af0d16029bf360950c6618c277b4ecb9664 --- acts/tests/google/net/ProxyTest.py | 178 +++++++++++++++++++++++++++++++++++++ 1 file changed, 178 insertions(+) create mode 100644 acts/tests/google/net/ProxyTest.py diff --git a/acts/tests/google/net/ProxyTest.py b/acts/tests/google/net/ProxyTest.py new file mode 100644 index 0000000000..66aa345118 --- /dev/null +++ b/acts/tests/google/net/ProxyTest.py @@ -0,0 +1,178 @@ +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from acts import asserts +from acts import base_test +from acts.test_decorators import test_tracker_info +from acts.test_utils.net.net_test_utils import start_tcpdump +from acts.test_utils.net.net_test_utils import stop_tcpdump +from acts.test_utils.wifi import wifi_test_utils as wutils + +from scapy.all import IP +from scapy.all import TCP +from scapy.all import UDP +from scapy.all import Raw +from scapy.all import rdpcap +from scapy.all import Scapy_Exception + + +class ProxyTest(base_test.BaseTestClass): + """ Network proxy tests """ + + def setup_class(self): + """ Setup devices for tests and unpack params """ + self.dut = self.android_devices[0] + req_params = ("wifi_network_no_dns_tls", "proxy_pac", "proxy_server", + "proxy_port", "bypass_host", "non_bypass_host") + self.unpack_userparams(req_params) + wutils.wifi_connect(self.dut, self.wifi_network_no_dns_tls) + self.tcpdump_pid = None + self.proxy_port = int(self.proxy_port) + + def teardown_test(self): + self.dut.droid.connectivityResetGlobalProxy() + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + if global_proxy: + self.log.error("Failed to reset global proxy settings") + + def teardown_class(self): + wutils.reset_wifi(self.dut) + + """ Helper methods """ + + def _verify_http_request(self, ad): + """ Send http requests to hosts + + Steps: + 1. Send http requests to hosts + a. Host that is bypassed by proxy server + b. Host that goes through proxy server + 2. Verify that both return valid responses + + Args: + 1. ad: dut to run http requests + """ + for host in [self.bypass_host, self.non_bypass_host]: + host = "https://%s" % host + result = ad.droid.httpRequestString(host) + asserts.assert_true(result, "Http request failed for %s" % host) + + def _verify_proxy_server(self, pcap_file, bypass_host, hostname): + """ Verify that http requests are going through proxy server + + Args: + 1. tcpdump: pcap file + 2. bypass_host: boolean value if the request goes through proxy + 3. hostname: hostname requested + + Returns: + True/False if the bypass condition met + """ + self.log.info("Checking proxy server for query to: %s" % hostname) + try: + packets = rdpcap(pcap_file) + except Scapy_Exception: + asserts.fail("Not a valid pcap file") + + dns_query = False + http_query = False + for pkt in packets: + summary = "%s" % pkt.summary() + if UDP in pkt and pkt[UDP].dport == 53 and hostname in summary: + dns_query = True + break + if TCP in pkt and pkt[TCP].dport == self.proxy_port and Raw in pkt\ + and hostname in str(pkt[Raw]): + http_query = True + + self.log.info("Bypass hostname set to: %s" % bypass_host) + self.log.info("Found DNS query for host: %s" % dns_query) + self.log.info("Found HTTP query for host: %s" % http_query) + if bypass_host and http_query and not dns_query or \ + not bypass_host and not http_query and dns_query: + return False + return True + + def _test_proxy(self): + """ Test pac piroxy and manual proxy settings + + Steps: + 1. Start tcpdump + 2. Run http requests + 3. Stop tcpdump + 4. Verify the packets from tcpdump have valid queries + """ + + # start tcpdump on the device + self.tcpdump_pid = start_tcpdump(self.dut, self.test_name) + + # verify http requests + self._verify_http_request(self.dut) + + # stop tcpdump on the device + pcap_file = stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name) + + # verify proxy server + result = self._verify_proxy_server(pcap_file, True, self.bypass_host) + asserts.assert_true(result, "Proxy failed for %s" % self.bypass_host) + result = self._verify_proxy_server(pcap_file, False, self.non_bypass_host) + asserts.assert_true(result, "Proxy failed for %s" % self.non_bypass_host) + + """ Test Cases """ + + @test_tracker_info(uuid="16881315-1a50-48ce-bd36-7b0d2f21b734") + def test_pac_proxy_over_wifi(self): + """ Test proxy with auto config over wifi + + Steps: + 1. Connect to a wifi network + 2. Set a global proxy with auto config + 3. Do a http request on the hostnames + 4. Verify that no DNS packets seen for non bypassed hostnames + 5. Verify that DNS packets seen for bypassed hostnames + """ + # set global pac proxy + self.log.info("Setting global proxy to: %s" % self.proxy_pac) + self.dut.droid.connectivitySetGlobalPacProxy(self.proxy_pac) + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + asserts.assert_true(global_proxy['PacUrl'] == self.proxy_pac, + "Failed to set pac proxy") + + # test proxy + self._test_proxy() + + @test_tracker_info(uuid="4d3361f6-866d-423c-9ed7-5a6943575fe9") + def test_manual_proxy_over_wifi(self): + """ Test manual proxy over wifi + + Steps: + 1. Connect to a wifi network + 2. Set a global manual proxy with proxy server, port & bypass URLs + 3. Do a http request on the hostnames + 4. Verify that no DNS packets are seen for non bypassed hostnames + 5. Verify that DNS packets seen for bypassed hostnames + """ + # set global manual proxy + self.log.info("Setting global proxy to: %s %s %s" % + (self.proxy_server, self.proxy_port, self.bypass_host)) + self.dut.droid.connectivitySetGlobalProxy(self.proxy_server, + self.proxy_port, + self.bypass_host) + global_proxy = self.dut.droid.connectivityGetGlobalProxy() + asserts.assert_true(global_proxy['Hostname'] == self.proxy_server, + "Failed to set manual proxy") + + # test proxy + self._test_proxy() -- cgit v1.2.3 From 7b00a3a220e4a4c257216503812a3ebd92b4b582 Mon Sep 17 00:00:00 2001 From: Girish Moturu Date: Thu, 9 Aug 2018 09:39:58 -0700 Subject: [DhcpServerTest] Add UUIDs to the new test cases CHERRY CHANGES FROM AOSP The CLs include 32 new test cases along with UUIDs Bug: 109670497 Test: Verified the changes manually Change-Id: Iae193bb07bc5092b685cf150c6b3457b0c5998a5 Merged-In: I4380caa039f7a45bf4b99c15629a0e2d09e6f7ff --- acts/framework/tests/acts_android_device_test.py | 5 +- acts/tests/google/net/DhcpServerTest.py | 794 ++++++++++++++++++++--- 2 files changed, 699 insertions(+), 100 deletions(-) diff --git a/acts/framework/tests/acts_android_device_test.py b/acts/framework/tests/acts_android_device_test.py index b2cb0b5fae..6c664f5e10 100755 --- a/acts/framework/tests/acts_android_device_test.py +++ b/acts/framework/tests/acts_android_device_test.py @@ -28,9 +28,10 @@ from acts.controllers import android_device MOCK_LOG_PATH = "/tmp/logs/MockTest/xx-xx-xx_xx-xx-xx/" # Mock start and end time of the adb cat. -MOCK_ADB_LOGCAT_BEGIN_TIME = "1970-01-02 21:03:20.123" -MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_ADB_EPOCH_BEGIN_TIME = 191000123 +MOCK_ADB_LOGCAT_BEGIN_TIME = logger.normalize_log_line_timestamp( + logger.epoch_to_log_line_timestamp(MOCK_ADB_EPOCH_BEGIN_TIME)) +MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_SERIAL = 1 MOCK_RELEASE_BUILD_ID = "ABC1.123456.007" diff --git a/acts/tests/google/net/DhcpServerTest.py b/acts/tests/google/net/DhcpServerTest.py index 4cc2b91da3..1f33d63422 100644 --- a/acts/tests/google/net/DhcpServerTest.py +++ b/acts/tests/google/net/DhcpServerTest.py @@ -6,12 +6,15 @@ from acts.test_decorators import test_tracker_info from scapy.all import * from threading import Event from threading import Thread +import random import time import warnings +CLIENT_PORT = 68 SERVER_PORT = 67 BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' +INET4_ANY = '0.0.0.0' NETADDR_PREFIX = '192.168.42.' OTHER_NETADDR_PREFIX = '192.168.43.' NETADDR_BROADCAST = '255.255.255.255' @@ -23,6 +26,7 @@ REQUEST = 3 ACK = 5 NAK = 6 + pmc_base_cmd = ( "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ") start_pmc_cmd = ( @@ -41,7 +45,7 @@ class DhcpServerTest(base_test.BaseTestClass): conf.checkIPaddr = 0 conf.checkIPsrc = 0 # Allow using non-67 server ports as long as client uses 68 - bind_layers(UDP, BOOTP, dport=68) + bind_layers(UDP, BOOTP, dport=CLIENT_PORT) self.dut.adb.shell(start_pmc_cmd) self.dut.adb.shell("setprop log.tag.PMC VERBOSE") @@ -54,12 +58,13 @@ class DhcpServerTest(base_test.BaseTestClass): thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) thread.start() - # Discover server IP + # Discover server IP and device hwaddr hwaddr = self._next_hwaddr() - resp = self._get_response(make_discover(hwaddr)) + resp = self._get_response(self._make_discover(hwaddr)) asserts.assert_false(None == resp, "Device did not reply to first DHCP discover") self.server_addr = getopt(resp, 'server_id') + self.dut_hwaddr = resp.getlayer(Ether).src asserts.assert_false(None == self.server_addr, "DHCP server did not specify server identifier") # Ensure that we don't depend on assigned route/gateway on the host @@ -94,6 +99,7 @@ class DhcpServerTest(base_test.BaseTestClass): self.log.info("Starting USB Tethering") dut.stop_services() dut.adb.shell(pmc_start_usb_tethering_cmd) + self._wait_for_device(self.dut) self.USB_TETHERED = True def _stop_usb_tethering(self, dut): @@ -151,88 +157,301 @@ class DhcpServerTest(base_test.BaseTestClass): @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") def test_config_assumptions(self): - resp = self._get_response(make_discover(self.hwaddr)) + resp = self._get_response(self._make_discover(self.hwaddr)) asserts.assert_false(None == resp, "Device did not reply to discover") asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), "Server does not use expected prefix") + @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568") + def test_discover_broadcastbit(self): + resp = self._get_response( + self._make_discover(self.hwaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191") + def test_discover_bootpfields(self): + discover = self._make_discover(self.hwaddr) + resp = self._get_response(discover) + self._assert_offer(resp) + self._assert_unicast(resp) + bootp = assert_bootp_response(resp, discover) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) + + @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182") + def test_discover_relayed_broadcastbit(self): + giaddr = NETADDR_PREFIX + '123' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp) + + def _run_discover_paramrequestlist(self, params, unwanted_params): + params_opt = make_paramrequestlist_opt(params) + resp = self._get_response( + self._make_discover(self.hwaddr, options=[params_opt])) + + self._assert_offer(resp) + # List of requested params in response order + resp_opts = get_opt_labels(resp) + resp_requested_opts = [opt for opt in resp_opts if opt in params] + # All above params should be supported, order should be conserved + asserts.assert_equal(params, resp_requested_opts) + asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params))) + return resp + + @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8") + def test_discover_paramrequestlist(self): + resp = self._run_discover_paramrequestlist( + ['subnet_mask', 'broadcast_address', 'router', 'name_server'], + unwanted_params=[]) + for opt in ['broadcast_address', 'router', 'name_server']: + asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX), + opt + ' does not start with ' + NETADDR_PREFIX) + + subnet_mask = getopt(resp, 'subnet_mask') + asserts.assert_true(subnet_mask.startswith('255.255.'), + 'Unexpected subnet mask for /16+: ' + subnet_mask) + + @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46") + def test_discover_paramrequestlist_rev(self): + # RFC2132 #9.8: "The DHCP server is not required to return the options + # in the requested order, but MUST try to insert the requested options + # in the order requested" + asserts.skip('legacy behavior not compliant: fixed order used') + self._run_discover_paramrequestlist( + ['name_server', 'router', 'broadcast_address', 'subnet_mask'], + unwanted_params=[]) + + @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b") + def test_discover_paramrequestlist_unwanted(self): + asserts.skip('legacy behavior always sends all parameters') + self._run_discover_paramrequestlist(['router', 'name_server'], + unwanted_params=['broadcast_address', 'subnet_mask']) + + def _assert_renews(self, request, addr, exp_time, resp_type=ACK): + # Sleep to test lease time renewal + time.sleep(3) + resp = self._get_response(request) + self._assert_type(resp, resp_type) + asserts.assert_equal(addr, get_yiaddr(resp)) + remaining_lease = getopt(resp, 'lease_time') + # Lease renewed: waited for 3s, lease time not decreased by more than 2 + asserts.assert_true(remaining_lease >= exp_time - 2, + 'Lease not renewed') + # Lease times should be consistent across offers/renewals + asserts.assert_true(remaining_lease <= exp_time + 2, + 'Lease time inconsistent') + return resp + @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") def test_discover_assigned_ownaddress(self): addr, siaddr, resp = self._request_address(self.hwaddr) lease_time = getopt(resp, 'lease_time') server_id = getopt(resp, 'server_id') - asserts.assert_true(lease_time > 10, "Lease time is unreasonably short") - asserts.assert_false(addr == '0.0.0.0', "Assigned address is empty") + asserts.assert_true(lease_time >= 60, "Lease time is too short") + asserts.assert_false(addr == INET4_ANY, "Assigned address is empty") # Wait to test lease expiration time change - time.sleep(2) + time.sleep(3) # New discover, same address - resp = self._get_response(make_discover(self.hwaddr)) - # Lease time renewed: exptime not decreased - asserts.assert_equal(lease_time, getopt(resp, 'lease_time')) - asserts.assert_equal(addr, get_yiaddr(resp)) + resp = self._assert_renews(self._make_discover(self.hwaddr), + addr, lease_time, resp_type=OFFER) + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") def test_discover_assigned_otherhost(self): addr, siaddr, _ = self._request_address(self.hwaddr) # New discover, same address, different client - resp = self._get_response(make_discover(self.other_hwaddr, + resp = self._get_response(self._make_discover(self.other_hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_false(get_yiaddr(resp) == addr, "Already assigned address offered") + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") def test_discover_requestaddress(self): addr = NETADDR_PREFIX + '200' - resp = self._get_response(make_discover(self.hwaddr, + resp = self._get_response(self._make_discover(self.hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_equal(get_yiaddr(resp), addr) # Lease not committed: can request again - resp = self._get_response(make_discover(self.other_hwaddr, + resp = self._get_response(self._make_discover(self.other_hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_equal(get_yiaddr(resp), addr) - def _assert_renews(self, request, addr, expTime): - time.sleep(2) - resp = self._get_response(request) - self._assert_ack(resp) - asserts.assert_equal(addr, get_yiaddr(resp)) - # Lease time renewed - asserts.assert_equal(expTime, getopt(resp, 'lease_time')) + @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd") + def test_discover_requestaddress_wrongsubnet(self): + addr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, [('requested_addr', addr)])) + self._assert_offer(resp) + self._assert_unicast(resp) + asserts.assert_false(get_yiaddr(resp) == addr, + 'Server offered invalid address') - @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") - def test_request_wrongnet(self): - resp = self._get_response(make_request(self.hwaddr, - OTHER_NETADDR_PREFIX + '1', None)) - self._assert_nak(resp) + @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf") + def test_discover_giaddr_outside_subnet(self): + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1") + def test_discover_srcaddr_outside_subnet(self): + srcaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, ip_src=srcaddr)) + self._assert_offer(resp) + asserts.assert_false(srcaddr == get_yiaddr(resp), + 'Server offered invalid address') + + @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e") + def test_discover_requestaddress_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + req = self._make_discover(self.hwaddr, [('requested_addr', addr)], + ip_src=giaddr, giaddr=giaddr) + resp = self._get_response(req) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db") + def test_discover_knownaddress_giaddr_outside_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in invalid subnet + giaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557") + def test_discover_knownaddress_giaddr_valid_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in valid subnet + giaddr = NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7") + def test_request_unicast(self): + addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False) + self._assert_unicast(resp, addr) + + @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3") + def test_request_bootpfields(self): + req_addr = NETADDR_PREFIX + '200' + req = self._make_request(self.hwaddr, req_addr, self.server_addr) + resp = self._get_response(req) + self._assert_ack(resp) + bootp = assert_bootp_response(resp, req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") - def test_request_inuse(self): + def test_request_selecting_inuse(self): addr, siaddr, _ = self._request_address(self.hwaddr) - res = self._get_response(make_request(self.other_hwaddr, addr, None)) - self._assert_nak(res) + new_req = self._make_request(self.other_hwaddr, addr, siaddr) + resp = self._get_response(new_req) + self._assert_nak(resp) + self._assert_broadcast(resp) + bootp = assert_bootp_response(resp, new_req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(INET4_ANY, bootp.yiaddr) + asserts.assert_equal(INET4_ANY, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp)) + asserts.assert_equal( + ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt + get_opt_labels(bootp)) + asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id')) + + @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c") + def test_request_selecting_wrongsiaddr(self): + addr = NETADDR_PREFIX + '200' + wrong_siaddr = NETADDR_PREFIX + '201' + asserts.assert_false(wrong_siaddr == self.server_addr, + 'Test assumption not met: server addr is ' + wrong_siaddr) + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr)) + asserts.assert_true(resp == None, + 'Received response for request with incorrect siaddr') + + @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f") + def test_request_selecting_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=self.server_addr, + giaddr=giaddr)) + asserts.assert_equal(resp, None) - @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") - def test_request_initreboot(self): + @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f") + def test_request_selecting_hostnameupdate(self): + addr = NETADDR_PREFIX + '123' + hostname1 = b'testhostname1' + hostname2 = b'testhostname2' + req = self._make_request(self.hwaddr, None, None, + options=[ + ('requested_addr', addr), + ('server_id', self.server_addr), + ('hostname', hostname1)]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname1, getopt(req, 'hostname')) + + # Re-request with different hostname + setopt(req, 'hostname', hostname2) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname2, getopt(req, 'hostname')) + + def _run_initreboot(self, bcastbit): addr, siaddr, resp = self._request_address(self.hwaddr) exp = getopt(resp, 'lease_time') - # siaddr NONE: init-reboot client state - self._assert_renews(make_request(self.hwaddr, addr, None), addr, exp) + # init-reboot: siaddr is None + return self._assert_renews(self._make_request( + self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp) + + @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") + def test_request_initreboot(self): + resp = self._run_initreboot(bcastbit=False) + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51") + def test_request_initreboot_broadcastbit(self): + resp = self._run_initreboot(bcastbit=True) + self._assert_broadcast(resp) @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") def test_request_initreboot_nolease(self): # RFC2131 #4.3.2 - asserts.skip("dnsmasq not compliant if --dhcp-authoritative set.") + asserts.skip("legacy behavior not compliant") addr = NETADDR_PREFIX + '123' - resp = self._get_response(make_request(self.hwaddr, addr, None)) + resp = self._get_response(self._make_request(self.hwaddr, addr, None)) asserts.assert_equal(resp, None) @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") @@ -242,24 +461,51 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == otheraddr, "Test assumption not met: server assigned " + otheraddr) - resp = self._get_response(make_request(self.hwaddr, otheraddr, None)) + resp = self._get_response( + self._make_request(self.hwaddr, otheraddr, siaddr=None)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") + def test_request_initreboot_wrongnet(self): + resp = self._get_response(self._make_request(self.hwaddr, + OTHER_NETADDR_PREFIX + '1', siaddr=None)) self._assert_nak(resp) + self._assert_broadcast(resp) - @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") - def test_request_rebinding(self): + def _run_rebinding(self, bcastbit, giaddr=INET4_ANY): addr, siaddr, resp = self._request_address(self.hwaddr) exp = getopt(resp, 'lease_time') - self._assert_renews(make_request(self.hwaddr, None, None, ciaddr=addr), + # Rebinding: no siaddr or reqaddr + resp = self._assert_renews( + self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=addr, giaddr=giaddr, ip_src=addr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit), addr, exp) + return resp, addr + + @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") + def test_request_rebinding(self): + resp, addr = self._run_rebinding(bcastbit=False) + self._assert_unicast(resp, addr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2") + def test_request_rebinding_relayed(self): + giaddr = NETADDR_PREFIX + '123' + resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") def test_request_rebinding_inuse(self): addr, siaddr, _ = self._request_address(self.hwaddr) - resp = self._get_response(make_request(self.other_hwaddr, None, None, - ciaddr=addr)) + resp = self._get_response(self._make_request( + self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr)) self._assert_nak(resp) + self._assert_broadcast(resp) @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") def test_request_rebinding_wrongaddr(self): @@ -268,8 +514,8 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == otheraddr, "Test assumption not met: server assigned " + otheraddr) - resp = self._get_response(make_request(self.hwaddr, None, None, - ciaddr=otheraddr)) + resp = self._get_response(self._make_request( + self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr)) self._assert_nak(resp) self._assert_broadcast(resp) @@ -283,58 +529,352 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == relayaddr, "Test assumption not met: server assigned " + relayaddr) - req = make_request(self.hwaddr, None, None, ciaddr=otheraddr) - req.getlayer(BOOTP).giaddr = relayaddr + req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=otheraddr, giaddr=relayaddr) resp = self._get_response(req) self._assert_nak(resp) self._assert_unicast(resp, relayaddr) + self._assert_broadcastbit(resp) @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") def test_release(self): addr, siaddr, _ = self._request_address(self.hwaddr) # Re-requesting fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) # Succeeds after release - self._send(make_release(self.hwaddr, addr, siaddr)) - time.sleep(1) - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._send(self._make_release(self.hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_ack(resp) @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") def test_release_noserverid(self): addr, siaddr, _ = self._request_address(self.hwaddr) - # Release without server_id opt ignored - release = make_release(self.hwaddr, addr, siaddr) + # Release without server_id opt is ignored + release = self._make_release(self.hwaddr, addr, siaddr) removeopt(release, 'server_id') self._send(release) # Not released: request fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") def test_release_wrongserverid(self): addr, siaddr, _ = self._request_address(self.hwaddr) # Release with wrong server id - release = make_release(self.hwaddr, addr, siaddr) + release = self._make_release(self.hwaddr, addr, siaddr) setopt(release, 'server_id', addr) self._send(release) # Not released: request fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) - def _request_address(self, hwaddr): - resp = self._get_response(make_discover(self.hwaddr)) + @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce") + def test_unicast_l2l3(self): + reqAddr = NETADDR_PREFIX + '124' + resp = self._get_response(self._make_request( + self.hwaddr, reqAddr, siaddr=None)) + self._assert_unicast(resp) + str_hwaddr = format_hwaddr(self.hwaddr) + asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst) + asserts.assert_equal(reqAddr, resp.getlayer(IP).dst) + asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + params_opt = make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 'NetBIOS_server', + 46, # NetBIOS over TCP/IP Node Type + ]) + req = self._make_discover(self.hwaddr, + options=[ + params_opt, + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_standard_offer(resp) + + def _make_macos_10_13_3_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + ]) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + req = self._make_discover(self.hwaddr, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + + @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86") + def test_macos_10_13_3_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test12-macbookpro'), + ]) + req.getlayer(BOOTP).secs = 5 + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp) + + # Note: macOS does not seem to do any rebinding (straight to discover) + @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b") + def test_macos_10_13_3_request_renewing(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True) + + def _make_win10_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'router', + 'name_server', + 'domain', + 31, # Perform Router Discover + 33, # Static Route + 'vendor_specific', + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + 47, # NetBIOS over TCP/IP Scope + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 252, # Private/Proxy autodiscovery + ]) + + @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76") + def test_win10_discover(self): + req = self._make_discover(self.hwaddr, bcastbit=True, + options=[ + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ], opts_padding=bytes(11)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410") + def test_win10_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, bcastbit=True, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + def _run_win10_request_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, bcastbit=bcast, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast) + + @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9") + def test_win10_request_renewing(self): + self._run_win10_request_renewing(bcast=False) + + @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751") + def test_win10_request_rebinding(self): + self._run_win10_request_renewing(bcast=True) + + def _make_debian_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'broadcast_address', + 'router', + 'name_server', + 119, # Domain Search + 'hostname', + 101, # TCode + 'domain', # NetBIOS over TCP/IP Name Server + 'vendor_specific', # NetBIOS over TCP/IP Node Type + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 33, # Static Route + 252, # Private/Proxy autodiscovery + 'NTP_server', + ]) + + @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5") + def test_debian_dhclient_4_3_5_discover(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_discover(self.hwaddr, + options=[ + ('requested_addr', req_ip), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(26)) + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + asserts.assert_equal(req_ip, get_yiaddr(resp)) + + @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac") + def test_debian_dhclient_4_3_5_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + ('server_id', self.server_addr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(20)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, with_hostname=True) + + def _run_debian_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], + opts_padding=bytes(32)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, + with_hostname=True) + + @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc") + def test_debian_dhclient_4_3_5_request_renewing(self): + self._run_debian_renewing(bcast=False) + + @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db") + def test_debian_dhclient_4_3_5_request_rebinding(self): + self._run_debian_renewing(bcast=True) + + def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False, + with_hostname=False): + # Responses to renew/rebind are always unicast to ciaddr even with + # broadcast flag set (RFC does not define this behavior, but this is + # more efficient and matches previous behavior) + if bcast and not renewing: + self._assert_broadcast(resp) + else: + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=bcast) + + bootp_resp = resp.getlayer(BOOTP) + asserts.assert_equal(0, bootp_resp.hops) + if renewing: + asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX), + 'ciaddr does not start with expected prefix') + else: + asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr) + asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX), + 'yiaddr does not start with expected prefix') + asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX), + 'siaddr does not start with expected prefix') + asserts.assert_equal(INET4_ANY, bootp_resp.giaddr) + + opt_labels = get_opt_labels(bootp_resp) + # FQDN option 81 is not supported in new behavior + opt_labels = [opt for opt in opt_labels if opt != 81] + + # Expect exactly these options in this order + expected_opts = [ + 'message-type', 'server_id', 'lease_time', 'renewal_time', + 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router', + 'name_server'] + if with_hostname: + expected_opts.append('hostname') + expected_opts.extend(['vendor_specific', 'end']) + asserts.assert_equal(expected_opts, opt_labels) + + def _request_address(self, hwaddr, bcast=True): + resp = self._get_response(self._make_discover(hwaddr)) self._assert_offer(resp) addr = get_yiaddr(resp) siaddr = getopt(resp, 'server_id') - resp = self._get_response(make_request(self.hwaddr, addr, siaddr)) + resp = self._get_response(self._make_request(hwaddr, addr, siaddr, + ip_dst=(INET4_ANY if bcast else siaddr))) self._assert_ack(resp) return addr, siaddr, resp @@ -343,7 +883,7 @@ class DhcpServerTest(base_test.BaseTestClass): bootp_resp = (resp or None) and resp.getlayer(BOOTP) if bootp_resp != None and get_mess_type(bootp_resp) == ACK: # Note down corresponding release for this request - release = make_release(bootp_resp.chaddr, bootp_resp.yiaddr, + release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr, getopt(bootp_resp, 'server_id')) self.cleanup_releases.append(release) return resp @@ -368,7 +908,12 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(None == packet, "No packet") asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) - asserts.assert_equal(packet.getlayer(BOOTP).flags, 0x8000) + self._assert_broadcastbit(packet) + + def _assert_broadcastbit(self, packet, isset=True): + mask = 0x8000 + flag = packet.getlayer(BOOTP).flags + asserts.assert_equal(flag & mask, mask if isset else 0) def _assert_unicast(self, packet, ipAddr=None): asserts.assert_false(None == packet, "No packet") @@ -382,10 +927,87 @@ class DhcpServerTest(base_test.BaseTestClass): self.next_hwaddr_index = self.next_hwaddr_index + 1 return addr + def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY, + ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY, + bcastbit=False): + broadcast = (ip_dst == NETADDR_BROADCAST) + ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr)) + ip = IP(src=ip_src, dst=ip_dst) + udp = UDP(sport=68, dport=SERVER_PORT) + bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr, + flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32)) + dhcp = DHCP(options=options) + return ethernet / ip / udp / bootp / dhcp + + def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY, + bcastbit=False, opts_padding=None, ip_src=INET4_ANY): + opts = [('message-type','discover')] + opts.extend(options) + opts.append('end') + if (opts_padding): + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src) + + def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY, + ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False, + options=[], opts_padding=None): + if not ip_dst: + ip_dst = siaddr or INET4_ANY + + if not ip_src and ip_dst == INET4_ANY: + ip_src = INET4_ANY + elif not ip_src: + ip_src = (giaddr if not isempty(giaddr) + else ciaddr if not isempty(ciaddr) + else reqaddr) + # Kernel will not receive unicast UDP packets with empty ip_src + asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src), + "Unicast ip_src cannot be zero") + opts = [('message-type', 'request')] + if options: + opts.extend(options) + else: + if siaddr: + opts.append(('server_id', siaddr)) + if reqaddr: + opts.append(('requested_addr', reqaddr)) + opts.append('end') + if opts_padding: + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, + ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit) + + def _make_release(self, src_hwaddr, addr, server_id): + opts = [('message-type', 'release'), ('server_id', server_id), 'end'] + return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr, + ip_dst=server_id) + +def assert_bootp_response(resp, req): + bootp = resp.getlayer(BOOTP) + asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op') + asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype') + asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen') + asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops') + asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID') + return bootp + + +def make_paramrequestlist_opt(params): + param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt + for opt in params] + return tuple(['param_req_list'] + [ + opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt + for opt in param_indexes]) + + +def isempty(addr): + return not addr or addr == INET4_ANY + def setopt(packet, optname, val): dhcp = packet.getlayer(DHCP) - if optname in [opt[0] for opt in dhcp.options]: + if optname in get_opt_labels(dhcp): dhcp.options = [(optname, val) if opt[0] == optname else opt for opt in dhcp.options] else: @@ -403,55 +1025,31 @@ def removeopt(packet, key): dhcp.options = [opt for opt in dhcp.options if opt[0] != key] -def get_yiaddr(packet): - return packet.getlayer(BOOTP).yiaddr - - -def get_mess_type(packet): - return getopt(packet, 'message-type') +def get_opt_labels(packet): + dhcp_resp = packet.getlayer(DHCP) + # end option is a single string, not a tuple. + return [opt if isinstance(opt, str) else opt[0] + for opt in dhcp_resp.options if opt != 'pad'] -def make_dhcp(src_hwaddr, options, ciaddr='0.0.0.0', ipSrc='0.0.0.0', - ipDst=NETADDR_BROADCAST): - broadcast = (ipDst == NETADDR_BROADCAST) - ethernet = Ether(dst=BROADCAST_MAC) if broadcast else Ether() - ip = IP(src=ipSrc, dst=ipDst) - udp = UDP(sport=68, dport=SERVER_PORT) - bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, - flags=(0x8000 if broadcast else 0), xid=RandInt()) - dhcp = DHCP(options=options) - return ethernet / ip / udp / bootp / dhcp - - -def make_discover(src_hwaddr, options = []): - opts = [('message-type','discover')] - opts.extend(options) - opts.append('end') - return make_dhcp(src_hwaddr, options=opts) +def get_yiaddr(packet): + return packet.getlayer(BOOTP).yiaddr -def make_request(src_hwaddr, reqaddr, siaddr, ciaddr='0.0.0.0', ipSrc=None): - if ipSrc == None: - ipSrc = ciaddr - opts = [('message-type', 'request')] - if siaddr: - opts.append(('server_id', siaddr)) - if reqaddr: - opts.append(('requested_addr', reqaddr)) - opts.append('end') - return make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, ipSrc=ciaddr) +def get_chaddr(packet): + # We use Ethernet addresses. Ignore address padding + return packet.getlayer(BOOTP).chaddr[:6] -def make_release(src_hwaddr, addr, server_id): - opts = [('message-type', 'release'), ('server_id', server_id), 'end'] - return make_dhcp(src_hwaddr, opts, ciaddr=addr, ipSrc=addr, ipDst=server_id) +def get_mess_type(packet): + return getopt(packet, 'message-type') def make_hwaddr(index): if index > 0xffff: raise ValueError("Address index out of range") - return '\x44\x85\x00\x00{}{}'.format(chr(index >> 8), chr(index & 0xff)) + return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff]) def format_hwaddr(addr): - return ':'.join(['%02x' % ord(c) for c in addr]) + return ':'.join(['%02x' % c for c in addr]) -- cgit v1.2.3