summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorBill Yi <byi@google.com>2018-11-30 11:50:51 -0800
committerandroid-build-merger <android-build-merger@google.com>2018-11-30 11:50:51 -0800
commite570fab62d8605e06dc26f7f1ea7fa685c096bfc (patch)
tree1ab6f1c060d261b55dd1b25d77bd9ea37d54df92
parent241b580aec1347a287cf939fe63aa4f32af2ee8e (diff)
parentd88d2b0d374d6d9777a3d335ade5311b1016944b (diff)
downloadplatform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.tar.gz
platform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.tar.bz2
platform_tools_test_connectivity-e570fab62d8605e06dc26f7f1ea7fa685c096bfc.zip
Merge pi-qpr1-release PQ1A.181105.017.A1 to pi-platform-release
am: d88d2b0d37 Change-Id: Icf7ddcbdb02a48749f3010a6e4d76792c24886c3
-rwxr-xr-xacts/framework/acts/controllers/access_point.py11
-rwxr-xr-xacts/framework/acts/controllers/android_device.py53
-rw-r--r--acts/framework/acts/test_utils/bt/BluetoothBaseTest.py81
-rw-r--r--acts/framework/acts/test_utils/bt/bluetooth.proto15
-rw-r--r--acts/framework/acts/test_utils/bt/bt_constants.py98
-rw-r--r--acts/framework/acts/test_utils/bt/bt_test_utils.py98
-rw-r--r--acts/framework/acts/test_utils/net/arduino_test_utils.py2
-rw-r--r--acts/framework/acts/test_utils/net/connectivity_test_utils.py63
-rw-r--r--acts/framework/acts/test_utils/net/net_test_utils.py274
-rw-r--r--acts/framework/acts/test_utils/power/PowerBaseTest.py8
-rw-r--r--acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py1
-rwxr-xr-xacts/framework/acts/test_utils/wifi/WifiBaseTest.py218
-rwxr-xr-xacts/framework/acts/test_utils/wifi/wifi_test_utils.py26
-rwxr-xr-xacts/framework/tests/acts_android_device_test.py5
-rw-r--r--acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py8
-rw-r--r--acts/tests/google/ble/filtering/UniqueFilteringTest.py91
-rw-r--r--acts/tests/google/ble/gatt/GattConnectTest.py2
-rw-r--r--acts/tests/google/ble/scan/BleOpportunisticScanTest.py13
-rw-r--r--acts/tests/google/bt/car_bt/BtCarMapMceTest.py20
-rwxr-xr-xacts/tests/google/net/CoreNetworkingOTATest.py102
-rw-r--r--acts/tests/google/net/CoreNetworkingTest.py13
-rw-r--r--acts/tests/google/net/DataCostTest.py23
-rw-r--r--acts/tests/google/net/DataUsageTest.py37
-rw-r--r--acts/tests/google/net/DhcpServerTest.py1055
-rw-r--r--acts/tests/google/net/DnsOverTlsTest.py41
-rw-r--r--acts/tests/google/net/IpSecTest.py26
-rw-r--r--acts/tests/google/net/LegacyVpnTest.py241
-rw-r--r--acts/tests/google/net/NattKeepAliveTest.py151
-rw-r--r--acts/tests/google/net/ProxyTest.py178
-rwxr-xr-xacts/tests/google/wifi/WifiIOTTest.py22
-rw-r--r--acts/tests/google/wifi/WifiIOTTwPkg1Test.py364
-rw-r--r--acts/tests/google/wifi/WifiRvrTwTest.py482
-rwxr-xr-xacts/tests/google/wifi/WifiStaApConcurrencyTest.py12
-rwxr-xr-xacts/tests/google/wifi/WifiStressTest.py74
-rwxr-xr-xacts/tests/google/wifi/WifiTethering2GOpenOTATest.py85
-rwxr-xr-xacts/tests/google/wifi/WifiTethering2GPskOTATest.py85
-rwxr-xr-xacts/tests/google/wifi/WifiTethering5GOpenOTATest.py85
-rwxr-xr-xacts/tests/google/wifi/WifiTethering5GPskOTATest.py85
-rw-r--r--acts/tests/google/wifi/WifiTetheringTest.py23
-rw-r--r--acts/tests/google/wifi/WifiThroughputStabilityTest.py17
40 files changed, 3838 insertions, 450 deletions
diff --git a/acts/framework/acts/controllers/access_point.py b/acts/framework/acts/controllers/access_point.py
index b861cdc865..313fcf999d 100755
--- a/acts/framework/acts/controllers/access_point.py
+++ b/acts/framework/acts/controllers/access_point.py
@@ -26,6 +26,7 @@ from acts.controllers.ap_lib import dhcp_config
from acts.controllers.ap_lib import dhcp_server
from acts.controllers.ap_lib import hostapd
from acts.controllers.ap_lib import hostapd_config
+from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.utils_lib.commands import ip
from acts.controllers.utils_lib.commands import route
from acts.controllers.utils_lib.commands import shell
@@ -308,15 +309,19 @@ class AccessPoint(object):
return interface
- def get_bssid_from_ssid(self, ssid):
+ def get_bssid_from_ssid(self, ssid, band):
"""Gets the BSSID from a provided SSID
Args:
- ssid: An SSID string
+ ssid: An SSID string.
+ band: 2G or 5G Wifi band.
Returns: The BSSID if on the AP or None if SSID could not be found.
"""
+ if band == hostapd_constants.BAND_2G:
+ interfaces = [self.wlan_2g, ssid]
+ else:
+ interfaces = [self.wlan_5g, ssid]
- interfaces = [self.wlan_2g, self.wlan_5g, ssid]
# Get the interface name associated with the given ssid.
for interface in interfaces:
cmd = "iw dev %s info|grep ssid|awk -F' ' '{print $2}'" % (
diff --git a/acts/framework/acts/controllers/android_device.py b/acts/framework/acts/controllers/android_device.py
index 3b9cb0ffa7..03097f049e 100755
--- a/acts/framework/acts/controllers/android_device.py
+++ b/acts/framework/acts/controllers/android_device.py
@@ -388,6 +388,9 @@ class AndroidDevice:
self.adb = adb.AdbProxy(serial, ssh_connection=ssh_connection)
self.fastboot = fastboot.FastbootProxy(
serial, ssh_connection=ssh_connection)
+ self.adb_logcat_file_path = os.path.join(
+ log_path_base, 'AndroidDevice%s' % serial,
+ "adblog,{},{}.txt".format(self.model, serial))
if not self.is_bootloader:
self.root_adb()
self._ssh_connection = ssh_connection
@@ -642,8 +645,9 @@ class AndroidDevice:
except (IndexError, ValueError) as e:
# Possible ValueError from string to int cast.
# Possible IndexError from split.
- self.log.warn('Command \"%s\" returned output line: '
- '\"%s\".\nError: %s', cmd, out, e)
+ self.log.warn(
+ 'Command \"%s\" returned output line: '
+ '\"%s\".\nError: %s', cmd, out, e)
except Exception as e:
self.log.warn(
'Device fails to check if %s running with \"%s\"\n'
@@ -754,7 +758,7 @@ class AndroidDevice:
begin_at = '-T "%s"' % self.last_logcat_timestamp
else:
begin_at = '-T 1'
- # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object.
+ # TODO(markdr): Pull 'adb -s %SERIAL' from the AdbProxy object.
cmd = "adb -s {} logcat {} -v year {} >> {}".format(
self.serial, begin_at, extra_params, logcat_file_path)
self.adb_logcat_process = utils.start_standing_subprocess(cmd)
@@ -810,8 +814,9 @@ class AndroidDevice:
'pm list packages | grep -w "package:%s"' % package_name))
except Exception as err:
- self.log.error('Could not determine if %s is installed. '
- 'Received error:\n%s', package_name, err)
+ self.log.error(
+ 'Could not determine if %s is installed. '
+ 'Received error:\n%s', package_name, err)
return False
def is_sl4a_installed(self):
@@ -835,8 +840,9 @@ class AndroidDevice:
self.log.info("apk %s is running", package_name)
return True
except Exception as e:
- self.log.warn("Device fails to check is %s running by %s "
- "Exception %s", package_name, cmd, e)
+ self.log.warn(
+ "Device fails to check is %s running by %s "
+ "Exception %s", package_name, cmd, e)
continue
self.log.debug("apk %s is not running", package_name)
return False
@@ -896,9 +902,9 @@ class AndroidDevice:
if new_br:
out = self.adb.shell("bugreportz", timeout=BUG_REPORT_TIMEOUT)
if not out.startswith("OK"):
- raise AndroidDeviceError("Failed to take bugreport on %s: %s" %
- (self.serial, out))
- br_out_path = out.split(':')[1].strip()
+ raise AndroidDeviceError(
+ "Failed to take bugreport on %s: %s" % (self.serial, out))
+ br_out_path = out.split(':')[1].strip().split()[0]
self.adb.pull("%s %s" % (br_out_path, full_out_path))
else:
self.adb.bugreport(
@@ -1142,6 +1148,9 @@ class AndroidDevice:
self.stop_services()
self.log.info("Restarting android runtime")
self.adb.shell("stop")
+ # Reset the boot completed flag before we restart the framework
+ # to correctly detect when the framework has fully come up.
+ self.adb.shell("setprop sys.boot_completed 0")
self.adb.shell("start")
self.wait_for_boot_completion()
self.root_adb()
@@ -1301,7 +1310,8 @@ class AndroidDevice:
def is_screen_lock_enabled(self):
"""Check if screen lock is enabled"""
- cmd = ("sqlite3 /data/system/locksettings.db .dump"" | grep lockscreen.password_type | grep -v alternate")
+ cmd = ("sqlite3 /data/system/locksettings.db .dump"
+ " | grep lockscreen.password_type | grep -v alternate")
out = self.adb.shell(cmd, ignore_status=True)
if "unable to open" in out:
self.root_adb()
@@ -1319,7 +1329,7 @@ class AndroidDevice:
self.log.info("Device is in CrpytKeeper window")
return True
if "StatusBar" in current_window and (
- (not current_app) or "FallbackHome" in current_app):
+ (not current_app) or "FallbackHome" in current_app):
self.log.info("Device is locked")
return True
return False
@@ -1370,12 +1380,13 @@ class AndroidDevice:
def exit_setup_wizard(self):
if not self.is_user_setup_complete() or self.is_setupwizard_on():
- self.adb.shell("pm disable %s" % self.get_setupwizard_package_name())
+ self.adb.shell(
+ "pm disable %s" % self.get_setupwizard_package_name())
# Wait up to 5 seconds for user_setup_complete to be updated
- for _ in range(5):
+ end_time = time.time() + 5
+ while time.time() < end_time:
if self.is_user_setup_complete() or not self.is_setupwizard_on():
return
- time.sleep(1)
# If fail to exit setup wizard, set local.prop and reboot
if not self.is_user_setup_complete() and self.is_setupwizard_on():
@@ -1386,12 +1397,18 @@ class AndroidDevice:
def get_setupwizard_package_name(self):
"""Finds setupwizard package/.activity
+ Bypass setupwizard or setupwraith depending on device.
+
Returns:
packageName/.ActivityName
- """
- package = self.adb.shell("pm list packages -f | grep setupwizard | grep com.google.android")
+ """
+ packages_to_skip = "'setupwizard|setupwraith'"
+ android_package_name = "com.google.android"
+ package = self.adb.shell(
+ "pm list packages -f | grep -E {} | grep {}".format(
+ packages_to_skip, android_package_name))
wizard_package = re.split("=", package)[1]
- activity = re.search("wizard/(.*?).apk", package, re.IGNORECASE).groups()[0]
+ activity = re.search("(.*?).apk", package, re.IGNORECASE).groups()[0]
self.log.info("%s/.%sActivity" % (wizard_package, activity))
return "%s/.%sActivity" % (wizard_package, activity)
diff --git a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py
index 01cb76af58..21d63c7810 100644
--- a/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py
+++ b/acts/framework/acts/test_utils/bt/BluetoothBaseTest.py
@@ -24,8 +24,14 @@ import os
from acts import utils
from acts.base_test import BaseTestClass
from acts.signals import TestSignal
+from acts.utils import create_dir
+from acts.utils import dump_string_to_file
from acts.controllers import android_device
+from acts.libs.proto.proto_utils import compile_import_proto
+from acts.libs.proto.proto_utils import parse_proto_to_ascii
+from acts.test_utils.bt.bt_metrics_utils import get_bluetooth_metrics
+from acts.test_utils.bt.bt_test_utils import get_device_selector_dictionary
from acts.test_utils.bt.bt_test_utils import reset_bluetooth
from acts.test_utils.bt.bt_test_utils import setup_multiple_devices_for_bt_test
from acts.test_utils.bt.bt_test_utils import take_btsnoop_logs
@@ -47,6 +53,36 @@ class BluetoothBaseTest(BaseTestClass):
BaseTestClass.__init__(self, controllers)
for ad in self.android_devices:
self._setup_bt_libs(ad)
+ if 'preferred_device_order' in self.user_params:
+ prefered_device_order = self.user_params['preferred_device_order']
+ for i, ad in enumerate(self.android_devices):
+ if ad.serial in prefered_device_order:
+ index = prefered_device_order.index(ad.serial)
+ self.android_devices[i], self.android_devices[index] = \
+ self.android_devices[index], self.android_devices[i]
+
+ def collect_bluetooth_manager_metrics_logs(self, ads, test_name):
+ """
+ Collect Bluetooth metrics logs, save an ascii log to disk and return
+ both binary and ascii logs to caller
+ :param ads: list of active Android devices
+ :return: List of binary metrics logs,
+ List of ascii metrics logs
+ """
+ bluetooth_logs = []
+ bluetooth_logs_ascii = []
+ for ad in ads:
+ serial = ad.serial
+ out_name = "{}_{}_{}".format(serial, test_name,
+ "bluetooth_metrics.txt")
+ bluetooth_log = get_bluetooth_metrics(ad,
+ ad.bluetooth_proto_module)
+ bluetooth_log_ascii = parse_proto_to_ascii(bluetooth_log)
+ dump_string_to_file(bluetooth_log_ascii,
+ os.path.join(ad.metrics_path, out_name))
+ bluetooth_logs.append(bluetooth_log)
+ bluetooth_logs_ascii.append(bluetooth_log_ascii)
+ return bluetooth_logs, bluetooth_logs_ascii
# Use for logging in the test cases to facilitate
# faster log lookup and reduce ambiguity in logging.
@@ -95,6 +131,8 @@ class BluetoothBaseTest(BaseTestClass):
return _safe_wrap_test_case
def setup_class(self):
+ self.device_selector = get_device_selector_dictionary(
+ self.android_devices)
if "reboot_between_test_class" in self.user_params:
threads = []
for a in self.android_devices:
@@ -104,7 +142,42 @@ class BluetoothBaseTest(BaseTestClass):
thread.start()
for t in threads:
t.join()
- return setup_multiple_devices_for_bt_test(self.android_devices)
+ if not setup_multiple_devices_for_bt_test(self.android_devices):
+ return False
+ if "bluetooth_proto_path" in self.user_params:
+ from google import protobuf
+
+ self.bluetooth_proto_path = self.user_params[
+ "bluetooth_proto_path"][0]
+ if not os.path.isfile(self.bluetooth_proto_path):
+ try:
+ self.bluetooth_proto_path = "{}/bluetooth.proto".format(
+ os.path.dirname(os.path.realpath(__file__)))
+ except Exception:
+ self.log.error("File not found.")
+ if not os.path.isfile(self.bluetooth_proto_path):
+ self.log.error("Unable to find Bluetooth proto {}.".format(
+ self.bluetooth_proto_path))
+ return False
+ for ad in self.android_devices:
+ ad.metrics_path = os.path.join(ad.log_path, "BluetoothMetrics")
+ create_dir(ad.metrics_path)
+ ad.bluetooth_proto_module = \
+ compile_import_proto(ad.metrics_path, self.bluetooth_proto_path)
+ if not ad.bluetooth_proto_module:
+ self.log.error("Unable to compile bluetooth proto at " +
+ self.bluetooth_proto_path)
+ return False
+ # Clear metrics.
+ get_bluetooth_metrics(ad, ad.bluetooth_proto_module)
+ return True
+
+ def teardown_class(self):
+ if "bluetooth_proto_path" in self.user_params:
+ # Collect metrics here bassed off class name
+ bluetooth_logs, bluetooth_logs_ascii = \
+ self.collect_bluetooth_manager_metrics_logs(
+ self.android_devices, self.__class__.__name__)
def setup_test(self):
self.timer_list = []
@@ -114,9 +187,6 @@ class BluetoothBaseTest(BaseTestClass):
a.droid.wakeUpNow()
return True
- def teardown_test(self):
- return True
-
def on_fail(self, test_name, begin_time):
self.log.debug(
"Test {} failed. Gathering bugreport and btsnoop logs".format(
@@ -175,3 +245,6 @@ class BluetoothBaseTest(BaseTestClass):
# Shell command library
setattr(android_device, "shell",
ShellCommands(log=self.log, dut=android_device))
+ # Setup Android Device feature list
+ setattr(android_device, "features",
+ android_device.adb.shell("pm list features").split("\n"))
diff --git a/acts/framework/acts/test_utils/bt/bluetooth.proto b/acts/framework/acts/test_utils/bt/bluetooth.proto
index a43ff47f55..4a07e59d76 100644
--- a/acts/framework/acts/test_utils/bt/bluetooth.proto
+++ b/acts/framework/acts/test_utils/bt/bluetooth.proto
@@ -114,6 +114,15 @@ message RFCommSession {
optional int32 tx_bytes = 2;
}
+enum A2dpSourceCodec {
+ A2DP_SOURCE_CODEC_UNKNOWN = 0;
+ A2DP_SOURCE_CODEC_SBC = 1;
+ A2DP_SOURCE_CODEC_AAC = 2;
+ A2DP_SOURCE_CODEC_APTX = 3;
+ A2DP_SOURCE_CODEC_APTX_HD = 4;
+ A2DP_SOURCE_CODEC_LDAC = 5;
+}
+
// Session information that gets logged for A2DP session.
message A2DPSession {
// Media timer in milliseconds.
@@ -139,6 +148,12 @@ message A2DPSession {
// Total audio time in this A2DP session
optional int64 audio_duration_millis = 8;
+
+ // Audio codec used in this A2DP session in A2DP source role
+ optional A2dpSourceCodec source_codec = 9;
+
+ // Whether A2DP offload is enabled in this A2DP session
+ optional bool is_a2dp_offload = 10;
}
message PairEvent {
diff --git a/acts/framework/acts/test_utils/bt/bt_constants.py b/acts/framework/acts/test_utils/bt/bt_constants.py
index f093a1f68c..5be4c79a20 100644
--- a/acts/framework/acts/test_utils/bt/bt_constants.py
+++ b/acts/framework/acts/test_utils/bt/bt_constants.py
@@ -596,3 +596,101 @@ logcat_strings = {
}
### End logcat strings dict"""
+
+### Begin Service Discovery UUIDS ###
+### Values match the Bluetooth SIG defined values: """
+""" https://www.bluetooth.com/specifications/assigned-numbers/service-discovery """
+sig_uuid_constants = {
+ "BASE_UUID": "0000{}-0000-1000-8000-00805F9B34FB",
+ "SDP": "0001",
+ "UDP": "0002",
+ "RFCOMM": "0003",
+ "TCP": "0004",
+ "TCS-BIN": "0005",
+ "TCS-AT": "0006",
+ "ATT": "0007",
+ "OBEX": "0008",
+ "IP": "0009",
+ "FTP": "000A",
+ "HTTP": "000C",
+ "WSP": "000E",
+ "BNEP": "000F",
+ "UPNP": "0010",
+ "HIDP": "0011",
+ "HardcopyControlChannel": "0012",
+ "HardcopyDataChannel": "0014",
+ "HardcopyNotification": "0016",
+ "AVCTP": "0017",
+ "AVDTP": "0019",
+ "CMTP": "001B",
+ "MCAPControlChannel": "001E",
+ "MCAPDataChannel": "001F",
+ "L2CAP": "0100",
+ "ServiceDiscoveryServerServiceClassID": "1000",
+ "BrowseGroupDescriptorServiceClassID": "1001",
+ "SerialPort": "1101",
+ "LANAccessUsingPPP": "1102",
+ "DialupNetworking": "1103",
+ "IrMCSync": "1104",
+ "OBEXObjectPush": "1105",
+ "OBEXFileTransfer": "1106",
+ "IrMCSyncCommand": "1107",
+ "Headset": "1108",
+ "CordlessTelephony": "1109",
+ "AudioSource": "110A",
+ "AudioSink": "110B",
+ "A/V_RemoteControlTarget": "110C",
+ "AdvancedAudioDistribution": "110D",
+ "A/V_RemoteControl": "110E",
+ "A/V_RemoteControlController": "110F",
+ "Intercom": "1110",
+ "Fax": "1111",
+ "Headset - Audio Gateway (AG)": "1112",
+ "WAP": "1113",
+ "WAP_CLIENT": "1114",
+ "PANU": "1115",
+ "NAP": "1116",
+ "GN": "1117",
+ "DirectPrinting": "1118",
+ "ReferencePrinting": "1119",
+ "ImagingResponder": "111B",
+ "ImagingAutomaticArchive": "111C",
+ "ImagingReferencedObjects": "111D",
+ "Handsfree": "111E",
+ "HandsfreeAudioGateway": "111F",
+ "DirectPrintingReferenceObjectsService": "1120",
+ "ReflectedUI": "1121",
+ "BasicPrinting": "1122",
+ "PrintingStatus": "1123",
+ "HumanInterfaceDeviceService": "1124",
+ "HardcopyCableReplacement": "1125",
+ "HCR_Print": "1126",
+ "HCR_Scan": "1127",
+ "Common_ISDN_Access": "1128",
+ "SIM_Access": "112D",
+ "Phonebook Access - PCE": "112E",
+ "Phonebook Access - PSE": "112F",
+ "Phonebook Access": "1130",
+ "Headset - HS": "1131",
+ "Message Access Server": "1132",
+ "Message Notification Server": "1133",
+ "Message Access Profile": "1134",
+ "GNSS": "1135",
+ "GNSS_Server": "1136",
+ "PnPInformation": "1200",
+ "GenericNetworking": "1201",
+ "GenericFileTransfer": "1202",
+ "GenericAudio": "1203",
+ "GenericTelephony": "1204",
+ "UPNP_Service": "1205",
+ "UPNP_IP_Service": "1206",
+ "ESDP_UPNP_IP_PAN": "1300",
+ "ESDP_UPNP_IP_LAP": "1301",
+ "ESDP_UPNP_L2CAP": "1302",
+ "VideoSource": "1303",
+ "VideoSink": "1304",
+ "VideoDistribution": "1305",
+ "HDP": "1400"
+}
+
+### End Service Discovery UUIDS ###
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index 9a930ab898..58f1a3c393 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -65,6 +65,7 @@ from acts.test_utils.bt.bt_constants import pan_connect_timeout
from acts.test_utils.bt.bt_constants import small_timeout
from acts.test_utils.bt.bt_constants import scan_result
from acts.test_utils.bt.bt_constants import scan_failed
+from acts.test_utils.bt.bt_constants import sig_uuid_constants
from acts.test_utils.bt.bt_constants import hid_id_keyboard
from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
@@ -223,7 +224,8 @@ def setup_multiple_devices_for_bt_test(android_devices):
threads = []
try:
for a in android_devices:
- thread = threading.Thread(target=factory_reset_bluetooth, args=([[a]]))
+ thread = threading.Thread(
+ target=factory_reset_bluetooth, args=([[a]]))
threads.append(thread)
thread.start()
for t in threads:
@@ -282,7 +284,11 @@ def bluetooth_enabled_check(ad):
return False
return True
-def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5):
+
+def wait_for_bluetooth_manager_state(droid,
+ state=None,
+ timeout=10,
+ threshold=5):
""" Waits for BlueTooth normalized state or normalized explicit state
args:
droid: droid device object
@@ -301,7 +307,8 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5)
# for any normalized state
if state is None:
if len(set(all_states[-threshold:])) == 1:
- log.info("State normalized {}".format(set(all_states[-threshold:])))
+ log.info("State normalized {}".format(
+ set(all_states[-threshold:])))
return True
else:
# explicit check against normalized state
@@ -310,9 +317,11 @@ def wait_for_bluetooth_manager_state(droid, state=None, timeout=10, threshold=5)
time.sleep(0.5)
log.error(
"Bluetooth state fails to normalize" if state is None else
- "Failed to match bluetooth state, current state {} expected state {}".format(get_state(), state))
+ "Failed to match bluetooth state, current state {} expected state {}".
+ format(get_state(), state))
return False
+
def factory_reset_bluetooth(android_devices):
"""Clears Bluetooth stack of input Android device list.
@@ -340,6 +349,7 @@ def factory_reset_bluetooth(android_devices):
return False
return True
+
def reset_bluetooth(android_devices):
"""Resets Bluetooth state of input Android device list.
@@ -576,6 +586,7 @@ def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
mac_address = event['data']['Result']['deviceInfo']['address']
return mac_address, advertise_callback, scan_callback
+
def enable_bluetooth(droid, ed):
if droid.bluetoothCheckState() is True:
return True
@@ -594,6 +605,7 @@ def enable_bluetooth(droid, ed):
return True
+
def disable_bluetooth(droid):
"""Disable Bluetooth on input Droid object.
@@ -1000,7 +1012,8 @@ def disconnect_pri_from_sec(pri_ad, sec_ad, profiles_list):
bluetooth_profile_connection_state_changed, bt_default_timeout)
pri_ad.log.info("Got event {}".format(profile_event))
except Exception as e:
- pri_ad.log.error("Did not disconnect from Profiles. Reason {}".format(e))
+ pri_ad.log.error(
+ "Did not disconnect from Profiles. Reason {}".format(e))
return False
profile = profile_event['data']['profile']
@@ -1387,3 +1400,78 @@ def is_a2dp_connected(sink, source):
if (device["address"] == source.droid.bluetoothGetLocalAddress()):
return True
return False
+
+
+def get_device_selector_dictionary(android_device_list):
+ """Create a dictionary of Bluetooth features vs Android devices.
+
+ Args:
+ android_device_list: The list of Android devices.
+ Returns:
+ A dictionary of profiles/features to Android devices.
+ """
+ selector_dict = {}
+ for ad in android_device_list:
+ uuids = ad.droid.bluetoothGetLocalUuids()
+
+ for profile, uuid_const in sig_uuid_constants.items():
+ uuid_check = sig_uuid_constants['BASE_UUID'].format(
+ uuid_const).lower()
+ if uuid_check in uuids:
+ if profile in selector_dict:
+ selector_dict[profile].append(ad)
+ else:
+ selector_dict[profile] = [ad]
+
+ # Various services may not be active during BT startup.
+ # If the device can be identified through adb shell pm list features
+ # then try to add them to the appropriate profiles / features.
+
+ # Android TV.
+ if "feature:com.google.android.tv.installed" in ad.features:
+ ad.log.info("Android TV device found.")
+ supported_profiles = ['AudioSink']
+ _add_android_device_to_dictionary(ad, supported_profiles,
+ selector_dict)
+
+ # Android Auto
+ elif "feature:android.hardware.type.automotive" in ad.features:
+ ad.log.info("Android Auto device found.")
+ # Add: AudioSink , A/V_RemoteControl,
+ supported_profiles = [
+ 'AudioSink', 'A/V_RemoteControl', 'Message Notification Server'
+ ]
+ _add_android_device_to_dictionary(ad, supported_profiles,
+ selector_dict)
+ # Android Wear
+ elif "feature:android.hardware.type.watch" in ad.features:
+ ad.log.info("Android Wear device found.")
+ supported_profiles = []
+ _add_android_device_to_dictionary(ad, supported_profiles,
+ selector_dict)
+ # Android Phone
+ elif "feature:android.hardware.telephony" in ad.features:
+ ad.log.info("Android Phone device found.")
+ # Add: AudioSink
+ supported_profiles = [
+ 'AudioSource', 'A/V_RemoteControlTarget',
+ 'Message Access Server'
+ ]
+ _add_android_device_to_dictionary(ad, supported_profiles,
+ selector_dict)
+ return selector_dict
+
+
+def _add_android_device_to_dictionary(android_device, profile_list,
+ selector_dict):
+ """Adds the AndroidDevice and supported features to the selector dictionary
+
+ Args:
+ android_device: The Android device.
+ profile_list: The list of profiles the Android device supports.
+ """
+ for profile in profile_list:
+ if profile in selector_dict and android_device not in selector_dict[profile]:
+ selector_dict[profile].append(android_device)
+ else:
+ selector_dict[profile] = [android_device]
diff --git a/acts/framework/acts/test_utils/net/arduino_test_utils.py b/acts/framework/acts/test_utils/net/arduino_test_utils.py
index af0b3da15f..058d1433a4 100644
--- a/acts/framework/acts/test_utils/net/arduino_test_utils.py
+++ b/acts/framework/acts/test_utils/net/arduino_test_utils.py
@@ -44,7 +44,7 @@ def connect_wifi(wd, network=None):
"""
wd.log.info("Flashing connect_wifi.ino onto dongle")
cmd = "locate %s" % CONNECT_WIFI
- file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").rstrip()
+ file_path = utils.exe_cmd(cmd).decode("utf-8", "ignore").split()[-1]
write_status = wd.write(ARDUINO, file_path, network)
asserts.assert_true(write_status, "Failed to flash connect wifi")
wd.log.info("Flashing complete")
diff --git a/acts/framework/acts/test_utils/net/connectivity_test_utils.py b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
new file mode 100644
index 0000000000..02167c966c
--- /dev/null
+++ b/acts/framework/acts/test_utils/net/connectivity_test_utils.py
@@ -0,0 +1,63 @@
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+
+def start_natt_keepalive(ad, src_ip, src_port, dst_ip, interval = 10):
+ """ Start NAT-T keep alive on dut """
+
+ ad.log.info("Starting NATT Keepalive")
+ status = None
+
+ key = ad.droid.connectivityStartNattKeepalive(
+ interval, src_ip, src_port, dst_ip)
+
+ ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Started")
+ try:
+ event = ad.ed.pop_event("PacketKeepaliveCallback")
+ status = event["data"]["packetKeepaliveEvent"]
+ except Empty:
+ msg = "Failed to receive confirmation of starting natt keepalive"
+ asserts.fail(msg)
+ finally:
+ ad.droid.connectivityNattKeepaliveStopListeningForEvent(
+ key, "Started")
+
+ if status != "Started":
+ ad.log.error("Received keepalive status: %s" % status)
+ ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
+ return None
+ return key
+
+def stop_natt_keepalive(ad, key):
+ """ Stop NAT-T keep alive on dut """
+
+ ad.log.info("Stopping NATT keepalive")
+ status = False
+ ad.droid.connectivityStopNattKeepalive(key)
+
+ ad.droid.connectivityNattKeepaliveStartListeningForEvent(key, "Stopped")
+ try:
+ event = ad.ed.pop_event("PacketKeepaliveCallback")
+ status = event["data"]["packetKeepaliveEvent"] == "Stopped"
+ except Empty:
+ msg = "Failed to receive confirmation of stopping natt keepalive"
+ asserts.fail(msg)
+ finally:
+ ad.droid.connectivityNattKeepaliveStopListeningForEvent(
+ key, "Stopped")
+
+ ad.droid.connectivityRemovePacketKeepaliveReceiverKey(key)
+ return status
diff --git a/acts/framework/acts/test_utils/net/net_test_utils.py b/acts/framework/acts/test_utils/net/net_test_utils.py
new file mode 100644
index 0000000000..867473caef
--- /dev/null
+++ b/acts/framework/acts/test_utils/net/net_test_utils.py
@@ -0,0 +1,274 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 Google, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts import utils
+from acts.logger import epoch_to_log_line_timestamp
+from acts.utils import get_current_epoch_time
+from acts.logger import normalize_log_line_timestamp
+from acts.utils import start_standing_subprocess
+from acts.utils import stop_standing_subprocess
+from acts.test_utils.net import connectivity_const as cconst
+from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
+from acts.test_utils.tel.tel_test_utils import verify_http_connection
+from acts.test_utils.wifi import wifi_test_utils as wutils
+
+import os
+import re
+import time
+import urllib.request
+
+VPN_CONST = cconst.VpnProfile
+VPN_TYPE = cconst.VpnProfileType
+VPN_PARAMS = cconst.VpnReqParams
+VPN_PING_ADDR = "10.10.10.1"
+TCPDUMP_PATH = "/data/local/tmp/tcpdump"
+
+def verify_lte_data_and_tethering_supported(ad):
+ """Verify if LTE data is enabled and tethering supported"""
+ wutils.wifi_toggle_state(ad, False)
+ ad.droid.telephonyToggleDataConnection(True)
+ wait_for_cell_data_connection(ad.log, ad, True)
+ asserts.assert_true(
+ verify_http_connection(ad.log, ad),
+ "HTTP verification failed on cell data connection")
+ asserts.assert_true(
+ ad.droid.connectivityIsTetheringSupported(),
+ "Tethering is not supported for the provider")
+ wutils.wifi_toggle_state(ad, True)
+
+def set_chrome_browser_permissions(ad):
+ """Set chrome browser start with no-first-run verification.
+ Give permission to read from and write to storage
+ """
+ commands = ["pm grant com.android.chrome "
+ "android.permission.READ_EXTERNAL_STORAGE",
+ "pm grant com.android.chrome "
+ "android.permission.WRITE_EXTERNAL_STORAGE",
+ "rm /data/local/chrome-command-line",
+ "am set-debug-app --persistent com.android.chrome",
+ 'echo "chrome --no-default-browser-check --no-first-run '
+ '--disable-fre" > /data/local/tmp/chrome-command-line']
+ for cmd in commands:
+ try:
+ ad.adb.shell(cmd)
+ except adb.AdbError:
+ self.log.warn("adb command %s failed on %s" % (cmd, ad.serial))
+
+def verify_ping_to_vpn_ip(ad):
+ """ Verify if IP behind VPN server is pingable.
+ Ping should pass, if VPN is connected.
+ Ping should fail, if VPN is disconnected.
+
+ Args:
+ ad: android device object
+ """
+ ping_result = None
+ pkt_loss = "100% packet loss"
+ try:
+ ping_result = ad.adb.shell("ping -c 3 -W 2 %s" % VPN_PING_ADDR)
+ except adb.AdbError:
+ pass
+ return ping_result and pkt_loss not in ping_result
+
+def legacy_vpn_connection_test_logic(ad, vpn_profile):
+ """ Test logic for each legacy VPN connection
+
+ Steps:
+ 1. Generate profile for the VPN type
+ 2. Establish connection to the server
+ 3. Verify that connection is established using LegacyVpnInfo
+ 4. Verify the connection by pinging the IP behind VPN
+ 5. Stop the VPN connection
+ 6. Check the connection status
+ 7. Verify that ping to IP behind VPN fails
+
+ Args:
+ 1. ad: Android device object
+ 2. VpnProfileType (1 of the 6 types supported by Android)
+ """
+ # Wait for sometime so that VPN server flushes all interfaces and
+ # connections after graceful termination
+ time.sleep(10)
+
+ ad.adb.shell("ip xfrm state flush")
+ ad.log.info("Connecting to: %s", vpn_profile)
+ ad.droid.vpnStartLegacyVpn(vpn_profile)
+ time.sleep(cconst.VPN_TIMEOUT)
+
+ connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
+ asserts.assert_equal(connected_vpn_info["state"],
+ cconst.VPN_STATE_CONNECTED,
+ "Unable to establish VPN connection for %s"
+ % vpn_profile)
+
+ ping_result = verify_ping_to_vpn_ip(ad)
+ ip_xfrm_state = ad.adb.shell("ip xfrm state")
+ match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
+ if match_obj:
+ ip_xfrm_state = format(match_obj.group(0)).split()
+ ad.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
+
+ ad.droid.vpnStopLegacyVpn()
+ asserts.assert_true(ping_result,
+ "Ping to the internal IP failed. "
+ "Expected to pass as VPN is connected")
+
+ connected_vpn_info = ad.droid.vpnGetLegacyVpnInfo()
+ asserts.assert_true(not connected_vpn_info,
+ "Unable to terminate VPN connection for %s"
+ % vpn_profile)
+
+def download_load_certs(ad, vpn_params, vpn_type, vpn_server_addr,
+ ipsec_server_type, log_path):
+ """ Download the certificates from VPN server and push to sdcard of DUT
+
+ Args:
+ ad: android device object
+ vpn_params: vpn params from config file
+ vpn_type: 1 of the 6 VPN types
+ vpn_server_addr: server addr to connect to
+ ipsec_server_type: ipsec version - strongswan or openswan
+ log_path: log path to download cert
+
+ Returns:
+ Client cert file name on DUT's sdcard
+ """
+ url = "http://%s%s%s" % (vpn_server_addr,
+ vpn_params['cert_path_vpnserver'],
+ vpn_params['client_pkcs_file_name'])
+ local_cert_name = "%s_%s_%s" % (vpn_type.name,
+ ipsec_server_type,
+ vpn_params['client_pkcs_file_name'])
+
+ local_file_path = os.path.join(log_path, local_cert_name)
+ try:
+ ret = urllib.request.urlopen(url)
+ with open(local_file_path, "wb") as f:
+ f.write(ret.read())
+ except:
+ asserts.fail("Unable to download certificate from the server")
+
+ f.close()
+ ad.adb.push("%s sdcard/" % local_file_path)
+ return local_cert_name
+
+def generate_legacy_vpn_profile(ad,
+ vpn_params,
+ vpn_type,
+ vpn_server_addr,
+ ipsec_server_type,
+ log_path):
+ """ Generate legacy VPN profile for a VPN
+
+ Args:
+ ad: android device object
+ vpn_params: vpn params from config file
+ vpn_type: 1 of the 6 VPN types
+ vpn_server_addr: server addr to connect to
+ ipsec_server_type: ipsec version - strongswan or openswan
+ log_path: log path to download cert
+
+ Returns:
+ Vpn profile
+ """
+ vpn_profile = {VPN_CONST.USER: vpn_params['vpn_username'],
+ VPN_CONST.PWD: vpn_params['vpn_password'],
+ VPN_CONST.TYPE: vpn_type.value,
+ VPN_CONST.SERVER: vpn_server_addr,}
+ vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,
+ ipsec_server_type)
+ if vpn_type.name == "PPTP":
+ vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
+
+ psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
+ rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
+
+ if vpn_type.name in psk_set:
+ vpn_profile[VPN_CONST.IPSEC_SECRET] = vpn_params['psk_secret']
+ elif vpn_type.name in rsa_set:
+ cert_name = download_load_certs(ad,
+ vpn_params,
+ vpn_type,
+ vpn_server_addr,
+ ipsec_server_type,
+ log_path)
+ vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
+ vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
+ ad.droid.installCertificate(vpn_profile, cert_name,
+ vpn_params['cert_password'])
+ else:
+ vpn_profile[VPN_CONST.MPPE] = "mppe"
+
+ return vpn_profile
+
+def start_tcpdump(ad, test_name):
+ """Start tcpdump on all interfaces
+
+ Args:
+ ad: android device object.
+ test_name: tcpdump file name will have this
+ """
+ ad.log.info("Starting tcpdump on all interfaces")
+ try:
+ ad.adb.shell("killall -9 tcpdump")
+ except AdbError:
+ ad.log.warn("Killing existing tcpdump processes failed")
+ out = ad.adb.shell("ls -l %s" % TCPDUMP_PATH)
+ if "No such file" in out or not out:
+ ad.adb.shell("mkdir %s" % TCPDUMP_PATH)
+ else:
+ ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
+
+ begin_time = epoch_to_log_line_timestamp(get_current_epoch_time())
+ begin_time = normalize_log_line_timestamp(begin_time)
+
+ file_name = "%s/tcpdump_%s_%s.pcap" % (TCPDUMP_PATH, ad.serial, test_name)
+ ad.log.info("tcpdump file is %s", file_name)
+ try:
+ cmd = "adb -s {} shell tcpdump -i any -s0 -w {}" . \
+ format(ad.serial, file_name)
+ return start_standing_subprocess(cmd, 5)
+ except e:
+ ad.log.error(e)
+
+ return None
+
+def stop_tcpdump(ad, proc, test_name):
+ """Stops tcpdump on any iface
+ Pulls the tcpdump file in the tcpdump dir
+
+ Args:
+ ad: android device object.
+ proc: need to know which pid to stop
+ test_name: test name to save the tcpdump file
+
+ Returns:
+ log_path of the tcpdump file
+ """
+ ad.log.info("Stopping and pulling tcpdump if any")
+ if proc == None:
+ return None
+ try:
+ stop_standing_subprocess(proc)
+ except Exception as e:
+ ad.log.warning(e)
+ log_path = os.path.join(ad.log_path, test_name)
+ utils.create_dir(log_path)
+ ad.adb.pull("%s/. %s" % (TCPDUMP_PATH, log_path))
+ ad.adb.shell("rm -rf %s/*" % TCPDUMP_PATH, ignore_status=True)
+ file_name = "tcpdump_%s_%s.pcap" % (ad.serial, test_name)
+ return "%s/%s" % (log_path, file_name)
diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py
index 0a45dd1954..78c5090599 100644
--- a/acts/framework/acts/test_utils/power/PowerBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py
@@ -125,7 +125,7 @@ class PowerBaseTest(base_test.BaseTestClass):
if hasattr(self, 'attenuators'):
self.num_atten = self.attenuators[0].instrument.num_atten
self.atten_level = self.unpack_custom_file(self.attenuation_file)
- self.set_attenuation(INITIAL_ATTEN)
+ self.set_attenuation(INITIAL_ATTEN)
self.threshold = self.unpack_custom_file(self.threshold_file)
self.mon_info = self.create_monsoon_info()
@@ -451,11 +451,11 @@ class PowerBaseTest(base_test.BaseTestClass):
self.brconfigs: dict for bridge interface configs
"""
wutils.wifi_toggle_state(self.dut, True)
- self.brconfigs = wputils.ap_setup(
- self.access_point, network, bandwidth=bandwidth)
+ if hasattr(self, 'access_points'):
+ self.brconfigs = wputils.ap_setup(
+ self.access_point, network, bandwidth=bandwidth)
if connect:
wutils.wifi_connect(self.dut, network)
- return self.brconfigs
def process_iperf_results(self):
"""Get the iperf results and process.
diff --git a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
index 635c8b54c8..19702f4540 100644
--- a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
@@ -34,6 +34,7 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest):
self.access_point_main = self.access_points[0]
if len(self.access_points) > 1:
self.access_point_aux = self.access_points[1]
+ if hasattr(self, 'network_file'):
self.networks = self.unpack_custom_file(self.network_file, False)
self.main_network = self.networks['main_network']
self.aux_network = self.networks['aux_network']
diff --git a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
index 3c5c92c3cc..4b6dc8c50e 100755
--- a/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
+++ b/acts/framework/acts/test_utils/wifi/WifiBaseTest.py
@@ -33,6 +33,9 @@ from acts.controllers.ap_lib import hostapd_bss_settings
from acts.controllers.ap_lib import hostapd_constants
from acts.controllers.ap_lib import hostapd_security
+AP_1 = 0
+AP_2 = 1
+MAX_AP_COUNT = 2
class WifiBaseTest(BaseTestClass):
def __init__(self, controllers):
@@ -43,8 +46,10 @@ class WifiBaseTest(BaseTestClass):
def get_wpa2_network(
self,
+ mirror_ap,
+ reference_networks,
hidden=False,
- ap_count=1,
+ same_ssid=False,
ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
passphrase_length_2g=hostapd_constants.AP_PASSPHRASE_LENGTH_2G,
@@ -53,13 +58,16 @@ class WifiBaseTest(BaseTestClass):
generator.
Args:
- ap_count: Determines if we want to use one or both the APs
- for configuration. If set to '2', then both APs will
- be configured with the same configuration.
- ssid_length_2g: Int, number of characters to use for 2G SSID.
+ mirror_ap: Boolean, determines if both APs use the same hostapd
+ config or different configs.
+ reference_networks: List of PSK networks.
+ same_ssid: Boolean, determines if both bands on AP use the same
+ SSID.
+ ssid_length_2gecond AP Int, number of characters to use for 2G SSID.
ssid_length_5g: Int, number of characters to use for 5G SSID.
passphrase_length_2g: Int, length of password for 2G network.
passphrase_length_5g: Int, length of password for 5G network.
+
Returns: A dict of 2G and 5G network lists for hostapd configuration.
"""
@@ -67,13 +75,20 @@ class WifiBaseTest(BaseTestClass):
network_dict_5g = {}
ref_5g_security = hostapd_constants.WPA2_STRING
ref_2g_security = hostapd_constants.WPA2_STRING
- self.user_params["reference_networks"] = []
- ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
- ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
+ if same_ssid:
+ ref_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
+ ref_5g_ssid = ref_2g_ssid
+
+ ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
+ ref_5g_passphrase = ref_2g_passphrase
- ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
- ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
+ else:
+ ref_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
+ ref_2g_passphrase = utils.rand_ascii_str(passphrase_length_2g)
+
+ ref_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
+ ref_5g_passphrase = utils.rand_ascii_str(passphrase_length_5g)
if hidden:
network_dict_2g = {
@@ -103,37 +118,47 @@ class WifiBaseTest(BaseTestClass):
}
ap = 0
- for ap in range(ap_count):
- self.user_params["reference_networks"].append({
- "2g":
- copy.copy(network_dict_2g),
- "5g":
- copy.copy(network_dict_5g)
+ for ap in range(MAX_AP_COUNT):
+ reference_networks.append({
+ "2g": copy.copy(network_dict_2g),
+ "5g": copy.copy(network_dict_5g)
})
- self.reference_networks = self.user_params["reference_networks"]
+ if not mirror_ap:
+ break
return {"2g": network_dict_2g, "5g": network_dict_5g}
def get_open_network(self,
+ mirror_ap,
+ open_network,
hidden=False,
- ap_count=1,
+ same_ssid=False,
ssid_length_2g=hostapd_constants.AP_SSID_LENGTH_2G,
ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G):
"""Generates SSIDs for a open network using a random generator.
Args:
- ap_count: Determines if we want to use one or both the APs for
- for configuration. If set to '2', then both APs will
- be configured with the same configuration.
+ mirror_ap: Boolean, determines if both APs use the same hostapd
+ config or different configs.
+ open_network: List of open networks.
+ same_ssid: Boolean, determines if both bands on AP use the same
+ SSID.
ssid_length_2g: Int, number of characters to use for 2G SSID.
ssid_length_5g: Int, number of characters to use for 5G SSID.
+
Returns: A dict of 2G and 5G network lists for hostapd configuration.
"""
network_dict_2g = {}
network_dict_5g = {}
- self.user_params["open_network"] = []
- open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
- open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
+
+ if same_ssid:
+ open_2g_ssid = 'xg_%s' % utils.rand_ascii_str(ssid_length_2g)
+ open_5g_ssid = open_2g_ssid
+
+ else:
+ open_2g_ssid = '2g_%s' % utils.rand_ascii_str(ssid_length_2g)
+ open_5g_ssid = '5g_%s' % utils.rand_ascii_str(ssid_length_5g)
+
if hidden:
network_dict_2g = {
"SSID": open_2g_ssid,
@@ -158,14 +183,35 @@ class WifiBaseTest(BaseTestClass):
}
ap = 0
- for ap in range(ap_count):
- self.user_params["open_network"].append({
- "2g": network_dict_2g,
- "5g": network_dict_5g
+ for ap in range(MAX_AP_COUNT):
+ open_network.append({
+ "2g": copy.copy(network_dict_2g),
+ "5g": copy.copy(network_dict_5g)
})
- self.open_network = self.user_params["open_network"]
+ if not mirror_ap:
+ break
return {"2g": network_dict_2g, "5g": network_dict_5g}
+ def update_bssid(self, ap_instance, ap, network, band):
+ """Get bssid and update network dictionary.
+
+ Args:
+ ap_instance: Accesspoint index that was configured.
+ ap: Accesspoint object corresponding to ap_instance.
+ network: Network dictionary.
+ band: Wifi networks' band.
+
+ """
+ bssid = ap.get_bssid_from_ssid(network["SSID"], band)
+
+ if network["security"] == hostapd_constants.WPA2_STRING:
+ # TODO:(bamahadev) Change all occurances of reference_networks
+ # to wpa_networks.
+ self.reference_networks[ap_instance][band]["bssid"] = bssid
+
+ if network["security"] == 'none':
+ self.open_network[ap_instance][band]["bssid"] = bssid
+
def populate_bssid(self, ap_instance, ap, networks_5g, networks_2g):
"""Get bssid for a given SSID and add it to the network dictionary.
@@ -180,20 +226,17 @@ class WifiBaseTest(BaseTestClass):
if not (networks_5g or networks_2g):
return
- for network in itertools.chain(networks_5g, networks_2g):
+ for network in networks_5g:
+ if 'channel' in network:
+ continue
+ self.update_bssid(ap_instance, ap, network,
+ hostapd_constants.BAND_5G)
+
+ for network in networks_2g:
if 'channel' in network:
continue
- bssid = ap.get_bssid_from_ssid(network["SSID"])
- if '2g' in network["SSID"]:
- band = hostapd_constants.BAND_2G
- else:
- band = hostapd_constants.BAND_5G
- if network["security"] == hostapd_constants.WPA2_STRING:
- # TODO:(bamahadev) Change all occurances of reference_networks
- # to wpa_networks.
- self.reference_networks[ap_instance][band]["bssid"] = bssid
- else:
- self.open_network[ap_instance][band]["bssid"] = bssid
+ self.update_bssid(ap_instance, ap, network,
+ hostapd_constants.BAND_2G)
def legacy_configure_ap_and_start(
self,
@@ -206,47 +249,86 @@ class WifiBaseTest(BaseTestClass):
ap_ssid_length_5g=hostapd_constants.AP_SSID_LENGTH_5G,
ap_passphrase_length_5g=hostapd_constants.AP_PASSPHRASE_LENGTH_5G,
hidden=False,
+ same_ssid=False,
+ mirror_ap=True,
ap_count=1):
asserts.assert_true(
len(self.user_params["AccessPoint"]) == 2,
"Exactly two access points must be specified. \
Each access point has 2 radios, one each for 2.4GHZ \
and 5GHz. A test can choose to use one or both APs.")
- network_list_2g = []
- network_list_5g = []
- network_list_2g.append({"channel": channel_2g})
- network_list_5g.append({"channel": channel_5g})
- if "reference_networks" in self.user_params:
- pass
- else:
- networks_dict = self.get_wpa2_network(hidden=hidden,
- ap_count=ap_count)
- network_list_2g.append(networks_dict["2g"])
- network_list_5g.append(networks_dict["5g"])
+ config_count = 1
+ count = 0
+
+ # For example, the NetworkSelector tests use 2 APs and require that
+ # both APs are not mirrored.
+ if not mirror_ap and ap_count == 1:
+ raise ValueError("ap_count cannot be 1 if mirror_ap is False.")
+
+ if not mirror_ap:
+ config_count = ap_count
+
+ self.user_params["reference_networks"] = []
+ self.user_params["open_network"] = []
+
+ for count in range(config_count):
+
+ network_list_2g = []
+ network_list_5g = []
+
+ orig_network_list_2g = []
+ orig_network_list_5g = []
+
+ network_list_2g.append({"channel": channel_2g})
+ network_list_5g.append({"channel": channel_5g})
+
+ networks_dict = self.get_wpa2_network(
+ mirror_ap,
+ self.user_params["reference_networks"],
+ hidden=hidden,
+ same_ssid=same_ssid)
+ self.reference_networks = self.user_params["reference_networks"]
- if "open_network" in self.user_params:
- pass
- else:
- networks_dict = self.get_open_network(hidden=hidden,
- ap_count=ap_count)
network_list_2g.append(networks_dict["2g"])
network_list_5g.append(networks_dict["5g"])
- orig_network_list_5g = copy.copy(network_list_5g)
- orig_network_list_2g = copy.copy(network_list_2g)
+ # When same_ssid is set, only configure one set of WPA networks.
+ # We cannot have more than one set because duplicate interface names
+ # are not allowed.
+ # TODO(bmahadev): Provide option to select the type of network,
+ # instead of defaulting to WPA.
+ if not same_ssid:
+ networks_dict = self.get_open_network(
+ mirror_ap,
+ self.user_params["open_network"],
+ hidden=hidden,
+ same_ssid=same_ssid)
+ self.open_network = self.user_params["open_network"]
- if len(network_list_5g) > 1:
- self.config_5g = self._generate_legacy_ap_config(network_list_5g)
- if len(network_list_2g) > 1:
- self.config_2g = self._generate_legacy_ap_config(network_list_2g)
- ap = 0
- for ap in range(ap_count):
- self.access_points[ap].start_ap(self.config_2g)
- self.access_points[ap].start_ap(self.config_5g)
- self.populate_bssid(ap, self.access_points[ap], orig_network_list_5g,
+ network_list_2g.append(networks_dict["2g"])
+ network_list_5g.append(networks_dict["5g"])
+
+ orig_network_list_5g = copy.copy(network_list_5g)
+ orig_network_list_2g = copy.copy(network_list_2g)
+
+ if len(network_list_5g) > 1:
+ self.config_5g = self._generate_legacy_ap_config(network_list_5g)
+ if len(network_list_2g) > 1:
+ self.config_2g = self._generate_legacy_ap_config(network_list_2g)
+
+ self.access_points[count].start_ap(self.config_2g)
+ self.access_points[count].start_ap(self.config_5g)
+ self.populate_bssid(count, self.access_points[count], orig_network_list_5g,
orig_network_list_2g)
+ # Repeat configuration on the second router.
+ if mirror_ap and ap_count == 2:
+ self.access_points[AP_2].start_ap(self.config_2g)
+ self.access_points[AP_2].start_ap(self.config_5g)
+ self.populate_bssid(AP_2, self.access_points[AP_2],
+ orig_network_list_5g, orig_network_list_2g)
+
def _generate_legacy_ap_config(self, network_list):
bss_settings = []
ap_settings = network_list.pop(0)
diff --git a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
index 47aa6f2cb5..0c96fed835 100755
--- a/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_test_utils.py
@@ -907,6 +907,32 @@ def start_wifi_tethering(ad, ssid, password, band=None, hidden=None):
finally:
ad.droid.wifiStopTrackingTetherStateChange()
+def save_wifi_soft_ap_config(ad, wifi_config, band=None, hidden=None):
+ """ Save a soft ap configuration """
+ if band:
+ wifi_config[WifiEnums.APBAND_KEY] = band
+ if hidden:
+ wifi_config[WifiEnums.HIDDEN_KEY] = hidden
+ asserts.assert_true(ad.droid.wifiSetWifiApConfiguration(wifi_config),
+ "Failed to set WifiAp Configuration")
+
+ wifi_ap = ad.droid.wifiGetApConfiguration()
+ asserts.assert_true(
+ wifi_ap[WifiEnums.SSID_KEY] == wifi_config[WifiEnums.SSID_KEY],
+ "Hotspot SSID doesn't match with expected SSID")
+
+def start_wifi_tethering_saved_config(ad):
+ """ Turn on wifi hotspot with a config that is already saved """
+ ad.droid.wifiStartTrackingTetherStateChange()
+ ad.droid.connectivityStartTethering(tel_defines.TETHERING_WIFI, False)
+ try:
+ ad.ed.pop_event("ConnectivityManagerOnTetheringStarted")
+ ad.ed.wait_for_event("TetherStateChanged",
+ lambda x: x["data"]["ACTIVE_TETHER"], 30)
+ except:
+ asserts.fail("Didn't receive wifi tethering starting confirmation")
+ finally:
+ ad.droid.wifiStopTrackingTetherStateChange()
def stop_wifi_tethering(ad):
"""Stops wifi tethering on an android_device.
diff --git a/acts/framework/tests/acts_android_device_test.py b/acts/framework/tests/acts_android_device_test.py
index b2cb0b5fae..6c664f5e10 100755
--- a/acts/framework/tests/acts_android_device_test.py
+++ b/acts/framework/tests/acts_android_device_test.py
@@ -28,9 +28,10 @@ from acts.controllers import android_device
MOCK_LOG_PATH = "/tmp/logs/MockTest/xx-xx-xx_xx-xx-xx/"
# Mock start and end time of the adb cat.
-MOCK_ADB_LOGCAT_BEGIN_TIME = "1970-01-02 21:03:20.123"
-MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000"
MOCK_ADB_EPOCH_BEGIN_TIME = 191000123
+MOCK_ADB_LOGCAT_BEGIN_TIME = logger.normalize_log_line_timestamp(
+ logger.epoch_to_log_line_timestamp(MOCK_ADB_EPOCH_BEGIN_TIME))
+MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000"
MOCK_SERIAL = 1
MOCK_RELEASE_BUILD_ID = "ABC1.123456.007"
diff --git a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
index 1972806c80..83b5ddc60e 100644
--- a/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
+++ b/acts/tests/google/ble/concurrency/ConcurrentBleScanningTest.py
@@ -264,13 +264,13 @@ class ConcurrentBleScanningTest(BluetoothBaseTest):
try:
self.scn_ad.ed.pop_event(
scan_failed.format(scan_callback), self.default_timeout)
- self.log.info(
- "Found scan event successfully. Iteration {} successful."
+ self.log.error(
+ "Unexpected scan event found. Iteration {} successful."
.format(i))
+ test_result = False
except Exception:
- self.log.info("Failed to find a onScanFailed event for callback {}"
+ self.log.info("No onScanFailed event for callback {} as expected."
.format(scan_callback))
- test_result = False
for callback in scan_callback_list:
self.scn_ad.droid.bleStopBleScan(callback)
return test_result
diff --git a/acts/tests/google/ble/filtering/UniqueFilteringTest.py b/acts/tests/google/ble/filtering/UniqueFilteringTest.py
index bb7b00f516..e4ad858197 100644
--- a/acts/tests/google/ble/filtering/UniqueFilteringTest.py
+++ b/acts/tests/google/ble/filtering/UniqueFilteringTest.py
@@ -58,8 +58,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
callbacktype = event['data']['CallbackType']
if callbacktype != expected_callbacktype:
self.log.debug(
- "Expected callback type: {}, Found callback type: {}"
- .format(expected_callbacktype, callbacktype))
+ "Expected callback type: {}, Found callback type: {}".format(
+ expected_callbacktype, callbacktype))
test_result = False
return test_result
@@ -109,6 +109,7 @@ class UniqueFilteringTest(BluetoothBaseTest):
Priority: 1
"""
test_result = True
+ self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
ble_advertise_settings_modes['low_latency'])
filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
@@ -118,6 +119,10 @@ class UniqueFilteringTest(BluetoothBaseTest):
generate_ble_advertise_objects(self.adv_ad.droid))
self.adv_ad.droid.bleStartBleAdvertising(
advertise_callback, advertise_data, advertise_settings)
+ self.scn_ad.droid.bleSetScanFilterDeviceName(
+ self.adv_ad.droid.bluetoothGetLocalName())
+ self.scn_ad.droid.bleBuildScanFilter(filter_list)
+
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
self.scn_ad.droid.bleFlushPendingScanResults(scan_callback)
@@ -128,10 +133,10 @@ class UniqueFilteringTest(BluetoothBaseTest):
self.log.debug(worker.result(self.default_timeout))
except Empty as error:
test_result = False
- self.log.debug("Test failed with Empty error: {}".format(error))
+ self.log.error("Test failed with Empty error: {}".format(error))
except concurrent.futures._base.TimeoutError as error:
test_result = False
- self.log.debug("Test failed with TimeoutError: {}".format(error))
+ self.log.error("Test failed with TimeoutError: {}".format(error))
self.scn_ad.droid.bleStopBleScan(scan_callback)
self.adv_ad.droid.bleStopBleAdvertising(advertise_callback)
return test_result
@@ -173,6 +178,9 @@ class UniqueFilteringTest(BluetoothBaseTest):
generate_ble_advertise_objects(self.adv_ad.droid))
self.adv_ad.droid.bleStartBleAdvertising(
advertise_callback, advertise_data, advertise_settings)
+ self.scn_ad.droid.bleSetScanFilterDeviceName(
+ self.adv_ad.droid.bluetoothGetLocalName())
+ self.scn_ad.droid.bleBuildScanFilter(filter_list)
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
system_time_nanos = self.scn_ad.droid.getSystemElapsedRealtimeNanos()
@@ -220,10 +228,14 @@ class UniqueFilteringTest(BluetoothBaseTest):
filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
self.scn_ad.droid)
expected_event_name = batch_scan_result.format(scan_callback)
+ self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
advertise_callback, advertise_data, advertise_settings = (
generate_ble_advertise_objects(self.adv_ad.droid))
self.adv_ad.droid.bleStartBleAdvertising(
advertise_callback, advertise_data, advertise_settings)
+ self.scn_ad.droid.bleSetScanFilterDeviceName(
+ self.adv_ad.droid.bluetoothGetLocalName())
+ self.scn_ad.droid.bleBuildScanFilter(filter_list)
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
worker = self.scn_ad.ed.handle_event(
@@ -232,8 +244,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
self.scn_ad.droid.bleFlushPendingScanResults(scan_callback)
try:
event_info = self.scn_ad.ed.pop_event(expected_event_name, 10)
- self.log.debug("Unexpectedly found an advertiser: {}".format(
- event_info))
+ self.log.debug(
+ "Unexpectedly found an advertiser: {}".format(event_info))
test_result = False
except Empty:
self.log.debug("No {} events were found as expected.".format(
@@ -285,8 +297,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
try:
event_info = self.scn_ad.ed.pop_event(expected_event_name,
self.default_timeout)
- self.log.error("Unexpectedly found an advertiser: {}".format(
- event_info))
+ self.log.error(
+ "Unexpectedly found an advertiser: {}".format(event_info))
test_result = False
except Empty:
self.log.debug("No events were found as expected.")
@@ -337,6 +349,9 @@ class UniqueFilteringTest(BluetoothBaseTest):
generate_ble_advertise_objects(self.adv_ad.droid))
self.adv_ad.droid.bleStartBleAdvertising(
advertise_callback, advertise_data, advertise_settings)
+ self.scn_ad.droid.bleSetScanFilterDeviceName(
+ self.adv_ad.droid.bluetoothGetLocalName())
+ self.scn_ad.droid.bleBuildScanFilter(filter_list)
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
worker = self.scn_ad.ed.handle_event(
@@ -345,8 +360,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
try:
event_info = self.scn_ad.ed.pop_event(expected_event_name,
self.default_timeout)
- self.log.error("Unexpectedly found an advertiser:".format(
- event_info))
+ self.log.error(
+ "Unexpectedly found an advertiser:".format(event_info))
test_result = False
except Empty as error:
self.log.debug("No events were found as expected.")
@@ -399,12 +414,12 @@ class UniqueFilteringTest(BluetoothBaseTest):
advertise_callback1, advertise_data1, advertise_settings1)
filter_list = self.scn_ad.droid.bleGenFilterList()
- self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[
- 'low_latency'])
+ self.scn_ad.droid.bleSetScanSettingsScanMode(
+ ble_scan_settings_modes['low_latency'])
scan_settings = self.scn_ad.droid.bleBuildScanSetting()
scan_callback = self.scn_ad.droid.bleGenScanCallback()
- self.scn_ad.droid.bleSetScanFilterManufacturerData(117, [1, 2, 3],
- [127, 127, 127])
+ self.scn_ad.droid.bleSetScanFilterManufacturerData(
+ 117, [1, 2, 3], [127, 127, 127])
self.scn_ad.droid.bleBuildScanFilter(filter_list)
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
@@ -439,27 +454,35 @@ class UniqueFilteringTest(BluetoothBaseTest):
Priority: 1
"""
test_result = True
- self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[
- 'low_latency'])
+ self.scn_ad.droid.bleSetScanSettingsScanMode(
+ ble_scan_settings_modes['low_latency'])
filter_list, scan_settings, scan_callback = generate_ble_scan_objects(
self.scn_ad.droid)
expected_event_name = scan_result.format(scan_callback)
self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
ble_advertise_settings_modes['low_latency'])
+ self.adv_ad.droid.bleSetAdvertiseDataIncludeDeviceName(True)
advertise_callback, advertise_data, advertise_settings = (
generate_ble_advertise_objects(self.adv_ad.droid))
self.adv_ad.droid.bleStartBleAdvertising(
advertise_callback, advertise_data, advertise_settings)
+ self.scn_ad.droid.bleSetScanFilterDeviceName(
+ self.adv_ad.droid.bluetoothGetLocalName())
+ self.scn_ad.droid.bleBuildScanFilter(filter_list)
self.scn_ad.droid.bleStartBleScan(filter_list, scan_settings,
scan_callback)
- event_info = self.scn_ad.ed.pop_event(expected_event_name,
- self.default_timeout)
+ try:
+ event_info = self.scn_ad.ed.pop_event(expected_event_name,
+ self.default_timeout)
+ except Empty as error:
+ self.log.error("Could not find initial advertisement.")
+ return False
mac_address = event_info['data']['Result']['deviceInfo']['address']
- self.log.info("Filter advertisement with address {}".format(
- mac_address))
+ self.log.info(
+ "Filter advertisement with address {}".format(mac_address))
self.scn_ad.droid.bleStopBleScan(scan_callback)
- self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[
- 'low_latency'])
+ self.scn_ad.droid.bleSetScanSettingsScanMode(
+ ble_scan_settings_modes['low_latency'])
self.scn_ad.droid.bleSetScanFilterDeviceAddress(mac_address)
filter_list2, scan_settings2, scan_callback2 = (
generate_ble_scan_objects(self.scn_ad.droid))
@@ -506,8 +529,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
Priority: 1
"""
manufacturer_id = 0x4c
- self.adv_ad.droid.bleAddAdvertiseDataManufacturerId(manufacturer_id,
- [0x01])
+ self.adv_ad.droid.bleAddAdvertiseDataManufacturerId(
+ manufacturer_id, [0x01])
self.adv_ad.droid.bleSetAdvertiseSettingsAdvertiseMode(
ble_advertise_settings_modes['low_latency'])
advertise_callback, advertise_data, advertise_settings = (
@@ -521,10 +544,10 @@ class UniqueFilteringTest(BluetoothBaseTest):
self.log.info("Failed to start advertisement.")
return False
- self.scn_ad.droid.bleSetScanSettingsScanMode(ble_scan_settings_modes[
- 'low_latency'])
- self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id,
- [0x01])
+ self.scn_ad.droid.bleSetScanSettingsScanMode(
+ ble_scan_settings_modes['low_latency'])
+ self.scn_ad.droid.bleSetScanFilterManufacturerData(
+ manufacturer_id, [0x01])
filter_list = self.scn_ad.droid.bleGenFilterList()
scan_settings = self.scn_ad.droid.bleBuildScanSetting()
scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list)
@@ -540,8 +563,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
try:
event = self.scn_ad.ed.pop_event(expected_event_name,
self.default_timeout)
- found_manufacturer_id = json.loads(event['data']['Result'][
- 'manufacturerIdList'])
+ found_manufacturer_id = json.loads(
+ event['data']['Result']['manufacturerIdList'])
if found_manufacturer_id[0] != manufacturer_id:
self.log.error(
"Manufacturer id mismatch. Found {}, Expected {}".
@@ -601,8 +624,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
self.scn_ad.droid.bleSetScanSettingsScanMode(
ble_scan_settings_modes['low_latency'])
- self.scn_ad.droid.bleSetScanFilterManufacturerData(manufacturer_id,
- [0x01])
+ self.scn_ad.droid.bleSetScanFilterManufacturerData(
+ manufacturer_id, [0x01])
filter_list = self.scn_ad.droid.bleGenFilterList()
scan_settings = self.scn_ad.droid.bleBuildScanSetting()
scan_filter = self.scn_ad.droid.bleBuildScanFilter(filter_list)
@@ -617,8 +640,8 @@ class UniqueFilteringTest(BluetoothBaseTest):
except Empty:
self.log.error("Unable to find beacon advertisement.")
return False
- found_manufacturer_id = json.loads(event['data']['Result'][
- 'manufacturerIdList'])
+ found_manufacturer_id = json.loads(
+ event['data']['Result']['manufacturerIdList'])
if found_manufacturer_id[0] != manufacturer_id:
self.log.error(
"Manufacturer id mismatch. Found {}, Expected {}".format(
diff --git a/acts/tests/google/ble/gatt/GattConnectTest.py b/acts/tests/google/ble/gatt/GattConnectTest.py
index 0b160a23cf..dd6857c941 100644
--- a/acts/tests/google/ble/gatt/GattConnectTest.py
+++ b/acts/tests/google/ble/gatt/GattConnectTest.py
@@ -308,7 +308,7 @@ class GattConnectTest(BluetoothBaseTest):
self.default_timeout)
except Empty:
self.log.error(
- gatt_cb_err['gatt_conn_change_err'].format(expected_event))
+ gatt_cb_err['gatt_conn_changed_err'].format(expected_event))
test_result = False
return self._orchestrate_gatt_disconnection(bluetooth_gatt,
gatt_callback)
diff --git a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
index e907cbb342..9803648000 100644
--- a/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
+++ b/acts/tests/google/ble/scan/BleOpportunisticScanTest.py
@@ -524,9 +524,9 @@ class BleOpportunisticScanTest(BluetoothBaseTest):
self):
"""Test opportunistic scan result from secondary scan filter.
- Tests opportunistic scan where the secondary scan instance does not find
- an advertisement but the scan instance with scan mode set to
- opportunistic scan will find an advertisement.
+ Tests opportunistic scan where the filtered scan instance does not find
+ an advertisement and the scan instance with scan mode set to
+ opportunistic scan will also not find an advertisement.
Steps:
1. Initialize advertiser and start advertisement on dut1 (make sure the
@@ -539,6 +539,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest):
6. Pop onScanResults from the second scanner
7. Expect no events
8. Pop onScanResults from the first scanner
+ 9. Expect no events
Expected Result:
Opportunistic scan instance finds an advertisement.
@@ -573,11 +574,7 @@ class BleOpportunisticScanTest(BluetoothBaseTest):
if not self._verify_no_events_found(
scan_result.format(scan_callback2)):
return False
- try:
- self.scn_ad.ed.pop_event(
- scan_result.format(scan_callback), self.default_timeout)
- except Empty:
- self.log.error("Opportunistic scan found no scan results.")
+ if not self._verify_no_events_found(scan_result.format(scan_callback)):
return False
return True
diff --git a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py
index ba8760aa88..1a32b25178 100644
--- a/acts/tests/google/bt/car_bt/BtCarMapMceTest.py
+++ b/acts/tests/google/bt/car_bt/BtCarMapMceTest.py
@@ -54,6 +54,13 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
time.sleep(4)
return True
+ def setup_test(self):
+ if not bt_test_utils.connect_pri_to_sec(
+ self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value])):
+ return False
+ # Grace time for connection to complete.
+ time.sleep(3)
+
def message_delivered(self, device):
try:
self.MCE.ed.pop_event(EventSmsDeliverSuccess, 15)
@@ -97,15 +104,11 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
@test_tracker_info(uuid='0858347a-e649-4f18-85b6-6990cc311dee')
@BluetoothBaseTest.bt_test_wrap
def test_send_message(self):
- bt_test_utils.connect_pri_to_sec(
- self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value]))
return self.send_message([self.REMOTE])
@test_tracker_info(uuid='b25caa53-3c7f-4cfa-a0ec-df9a8f925fe5')
@BluetoothBaseTest.bt_test_wrap
def test_receive_message(self):
- bt_test_utils.connect_pri_to_sec(
- self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value]))
self.MSE.log.info("Start Tracking SMS.")
self.MSE.droid.smsStartTrackingIncomingSmsMessage()
self.REMOTE.log.info("Ready to send")
@@ -130,6 +133,9 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
@test_tracker_info(uuid='19444142-1d07-47dc-860b-f435cba46fca')
@BluetoothBaseTest.bt_test_wrap
def test_send_message_failure_no_map_connection(self):
+ if not bt_test_utils.disconnect_pri_from_sec(
+ self.MCE, self.MSE, [BtEnum.BluetoothProfile.MAP_MCE.value]):
+ return False
return not self.send_message([self.REMOTE])
@test_tracker_info(uuid='c7e569c0-9f6c-49a4-8132-14bc544ccb53')
@@ -148,8 +154,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
@test_tracker_info(uuid='8cdb4a54-3f18-482f-be3d-acda9c4cbeed')
@BluetoothBaseTest.bt_test_wrap
def test_disconnect_failure_send_message(self):
- connected = bt_test_utils.connect_pri_to_sec(
- self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value]))
addr = self.MSE.droid.bluetoothGetLocalAddress()
if bt_test_utils.is_map_mce_device_connected(self.MCE, addr):
connected = True
@@ -167,8 +171,6 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
@test_tracker_info(uuid='2d79a896-b1c1-4fb7-9924-db8b5c698be5')
@BluetoothBaseTest.bt_test_wrap
def manual_test_send_message_to_contact(self):
- bt_test_utils.connect_pri_to_sec(
- self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value]))
contacts = self.MCE.droid.contactsGetContactIds()
self.log.info(contacts)
selected_contact = self.MCE.droid.contactsDisplayContactPickList()
@@ -181,6 +183,4 @@ class BtCarMapMceTest(BluetoothCarHfpBaseTest):
@test_tracker_info(uuid='8ce9a7dd-3b5e-4aee-a897-30740e2439c3')
@BluetoothBaseTest.bt_test_wrap
def test_send_message_to_multiple_phones(self):
- bt_test_utils.connect_pri_to_sec(
- self.MCE, self.MSE, set([BtEnum.BluetoothProfile.MAP_MCE.value]))
return self.send_message([self.REMOTE, self.REMOTE])
diff --git a/acts/tests/google/net/CoreNetworkingOTATest.py b/acts/tests/google/net/CoreNetworkingOTATest.py
new file mode 100755
index 0000000000..2444971a87
--- /dev/null
+++ b/acts/tests/google/net/CoreNetworkingOTATest.py
@@ -0,0 +1,102 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import re
+import time
+import urllib.request
+
+import acts.base_test
+import acts.signals as signals
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.net import connectivity_const as cconst
+
+import acts.test_utils.net.net_test_utils as nutils
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+VPN_CONST = cconst.VpnProfile
+VPN_TYPE = cconst.VpnProfileType
+VPN_PARAMS = cconst.VpnReqParams
+
+
+class CoreNetworkingOTATest(BaseTestClass):
+ """Core networking auto update tests"""
+
+ def setup_class(self):
+ super(CoreNetworkingOTATest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+
+ self.dut = self.android_devices[0]
+ req_params = dir(VPN_PARAMS)
+ req_params = [x for x in req_params if not x.startswith('__')]
+ req_params.extend(["download_file", "file_size"])
+ self.unpack_userparams(req_params)
+
+ vpn_params = {'vpn_username': self.vpn_username,
+ 'vpn_password': self.vpn_password,
+ 'psk_secret': self.psk_secret,
+ 'client_pkcs_file_name': self.client_pkcs_file_name,
+ 'cert_path_vpnserver': self.cert_path_vpnserver,
+ 'cert_password': self.cert_password}
+
+ # generate legacy vpn profiles
+ wutils.wifi_connect(self.dut, self.wifi_network)
+ self.xauth_rsa = nutils.generate_legacy_vpn_profile(
+ self.dut, vpn_params,
+ VPN_TYPE.IPSEC_XAUTH_RSA,
+ self.vpn_server_addresses[VPN_TYPE.IPSEC_XAUTH_RSA.name][0],
+ self.ipsec_server_type[0], self.log_path)
+ self.l2tp_rsa = nutils.generate_legacy_vpn_profile(
+ self.dut, vpn_params,
+ VPN_TYPE.L2TP_IPSEC_RSA,
+ self.vpn_server_addresses[VPN_TYPE.L2TP_IPSEC_RSA.name][0],
+ self.ipsec_server_type[0], self.log_path)
+ self.hybrid_rsa = nutils.generate_legacy_vpn_profile(
+ self.dut, vpn_params,
+ VPN_TYPE.IPSEC_HYBRID_RSA,
+ self.vpn_server_addresses[VPN_TYPE.IPSEC_HYBRID_RSA.name][0],
+ self.ipsec_server_type[0], self.log_path)
+ self.vpn_profiles = [self.l2tp_rsa, self.hybrid_rsa, self.xauth_rsa]
+
+ # verify legacy vpn
+ for prof in self.vpn_profiles:
+ nutils.legacy_vpn_connection_test_logic(self.dut, prof)
+
+ # Run OTA below, if ota fails then abort all tests.
+ try:
+ for ad in self.android_devices:
+ ota_updater.update(ad)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+
+ """Tests"""
+
+ def test_legacy_vpn_ota_xauth_rsa(self):
+ nutils.legacy_vpn_connection_test_logic(self.dut, self.xauth_rsa)
+
+ def test_legacy_vpn_ota_l2tp_rsa(self):
+ nutils.legacy_vpn_connection_test_logic(self.dut, self.l2tp_rsa)
+
+ def test_legacy_vpn_ota_hybrid_rsa(self):
+ nutils.legacy_vpn_connection_test_logic(self.dut, self.hybrid_rsa)
diff --git a/acts/tests/google/net/CoreNetworkingTest.py b/acts/tests/google/net/CoreNetworkingTest.py
index f06be4a54f..308beda8e1 100644
--- a/acts/tests/google/net/CoreNetworkingTest.py
+++ b/acts/tests/google/net/CoreNetworkingTest.py
@@ -17,8 +17,7 @@ from acts import asserts
from acts import base_test
from acts.controllers import adb
from acts.test_decorators import test_tracker_info
-from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
-from acts.test_utils.tel.tel_test_utils import verify_http_connection
+from acts.test_utils.net import net_test_utils as nutils
from acts.test_utils.wifi import wifi_test_utils as wutils
dum_class = "com.android.tests.connectivity.uid.ConnectivityTestActivity"
@@ -30,17 +29,15 @@ class CoreNetworkingTest(base_test.BaseTestClass):
def setup_class(self):
""" Setup devices for tests and unpack params """
self.dut = self.android_devices[0]
- wutils.wifi_toggle_state(self.dut, False)
- self.dut.droid.telephonyToggleDataConnection(True)
- wait_for_cell_data_connection(self.log, self.dut, True)
- asserts.assert_true(
- verify_http_connection(self.log, self.dut),
- "HTTP verification failed on cell data connection")
+ nutils.verify_lte_data_and_tethering_supported(self.dut)
def teardown_class(self):
""" Reset devices """
wutils.wifi_toggle_state(self.dut, True)
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+
""" Test Cases """
@test_tracker_info(uuid="0c89d632-aafe-4bbd-a812-7b0eca6aafc7")
diff --git a/acts/tests/google/net/DataCostTest.py b/acts/tests/google/net/DataCostTest.py
index 4a829489e6..fb63c7a36b 100644
--- a/acts/tests/google/net/DataCostTest.py
+++ b/acts/tests/google/net/DataCostTest.py
@@ -25,8 +25,7 @@ from acts import base_test
from acts import test_runner
from acts.controllers import adb
from acts.test_decorators import test_tracker_info
-from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
-from acts.test_utils.tel.tel_test_utils import verify_http_connection
+from acts.test_utils.net import net_test_utils as nutils
from acts.test_utils.tel.tel_test_utils import _check_file_existance
from acts.test_utils.tel.tel_test_utils import _generate_file_directory_and_file_name
from acts.test_utils.wifi import wifi_test_utils as wutils
@@ -48,17 +47,19 @@ class DataCostTest(base_test.BaseTestClass):
self.unpack_userparams(req_params)
for ad in self.android_devices:
- wutils.reset_wifi(ad)
- ad.droid.telephonyToggleDataConnection(True)
- wait_for_cell_data_connection(self.log, ad, True)
- asserts.assert_true(
- verify_http_connection(self.log, ad),
- "HTTP verification failed on cell data connection")
+ nutils.verify_lte_data_and_tethering_supported(ad)
def teardown_class(self):
+ """ Reset settings to default """
for ad in self.android_devices:
+ sub_id = str(ad.droid.telephonyGetSubscriberId())
+ ad.droid.connectivityFactoryResetNetworkPolicies(sub_id)
+ ad.droid.connectivitySetDataWarningLimit(sub_id, -1)
wutils.reset_wifi(ad)
- ad.droid.telephonyToggleDataConnection(True)
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -155,10 +156,6 @@ class DataCostTest(base_test.BaseTestClass):
ad.droid.connectivitySetDataWarningLimit(
sub_id, int((total_pre + 5) * 1000.0 * 1000.0))
- # reset data limit
- ad.droid.connectivityFactoryResetNetworkPolicies(sub_id)
- ad.droid.connectivitySetDataWarningLimit(sub_id, -1)
-
# verify multipath preference values
self._verify_multipath_preferences(
ad, RELIABLE, NONE, wifi_network, cell_network)
diff --git a/acts/tests/google/net/DataUsageTest.py b/acts/tests/google/net/DataUsageTest.py
index 0e636ac35b..32ebd7f632 100644
--- a/acts/tests/google/net/DataUsageTest.py
+++ b/acts/tests/google/net/DataUsageTest.py
@@ -23,6 +23,7 @@ from acts.test_utils.net import connectivity_const as cconst
from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
from acts.test_utils.tel.tel_test_utils import http_file_download_by_chrome
from acts.test_utils.tel.tel_test_utils import verify_http_connection
+import acts.test_utils.net.net_test_utils as nutils
from acts.test_utils.tel import tel_test_utils as ttutils
from acts.test_utils.wifi import wifi_test_utils as wutils
@@ -56,12 +57,7 @@ class DataUsageTest(base_test.BaseTestClass):
""" Setup devices for tests and unpack params """
self.dut = self.android_devices[0]
self.tethered_devices = self.android_devices[1:]
- wutils.reset_wifi(self.dut)
- self.dut.droid.telephonyToggleDataConnection(True)
- wait_for_cell_data_connection(self.log, self.dut, True)
- asserts.assert_true(
- verify_http_connection(self.log, self.dut),
- "HTTP verification failed on cell data connection")
+ nutils.verify_lte_data_and_tethering_supported(self.dut)
# unpack user params
req_params = ("wifi_network", "download_file", "file_size", "network")
@@ -79,24 +75,16 @@ class DataUsageTest(base_test.BaseTestClass):
# Set chrome browser start with no-first-run verification
# Give permission to read from and write to storage
- commands = ["pm grant com.android.chrome "
- "android.permission.READ_EXTERNAL_STORAGE",
- "pm grant com.android.chrome "
- "android.permission.WRITE_EXTERNAL_STORAGE",
- "rm /data/local/chrome-command-line",
- "am set-debug-app --persistent com.android.chrome",
- 'echo "chrome --no-default-browser-check --no-first-run '
- '--disable-fre" > /data/local/tmp/chrome-command-line']
- for cmd in commands:
- for dut in self.android_devices:
- try:
- dut.adb.shell(cmd)
- except adb.AdbError:
- self.log.warn("adb command %s failed on %s" % (cmd, dut.serial))
+ nutils.set_chrome_browser_permissions(self.dut)
def teardown_class(self):
""" Reset devices """
- wutils.reset_wifi(self.dut)
+ for ad in self.android_devices:
+ wutils.reset_wifi(ad)
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -206,9 +194,6 @@ class DataUsageTest(base_test.BaseTestClass):
5. Verify that data usage of ConnUIDTest app increased by ~xMB
6. Verify that data usage of device also increased by ~xMB
"""
- # disable wifi
- wutils.wifi_toggle_state(self.dut, False)
-
# get pre mobile data usage
(aos_pre, app_pre, total_pre) = self._get_data_usage(self.dut,
cconst.TYPE_MOBILE)
@@ -264,6 +249,10 @@ class DataUsageTest(base_test.BaseTestClass):
self.log.info("Data usage of Android os increased by %s" % aos_diff)
self.log.info("Data usage of ConnUID app increased by %s" % app_diff)
self.log.info("Data usage on the device increased by %s" % total_diff)
+
+ # forget network
+ wutils.wifi_forget_network(self.dut, self.wifi_network['SSID'])
+
return (aos_diff < DATA_ERR) and \
(self.file_size < app_diff < self.file_size + DATA_USG_ERR) and \
(self.file_size < total_diff < self.file_size + DATA_USG_ERR)
diff --git a/acts/tests/google/net/DhcpServerTest.py b/acts/tests/google/net/DhcpServerTest.py
new file mode 100644
index 0000000000..1f33d63422
--- /dev/null
+++ b/acts/tests/google/net/DhcpServerTest.py
@@ -0,0 +1,1055 @@
+from acts import asserts
+from acts import base_test
+from acts.controllers import android_device
+from acts.test_decorators import test_tracker_info
+
+from scapy.all import *
+from threading import Event
+from threading import Thread
+import random
+import time
+import warnings
+
+
+CLIENT_PORT = 68
+SERVER_PORT = 67
+BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff'
+INET4_ANY = '0.0.0.0'
+NETADDR_PREFIX = '192.168.42.'
+OTHER_NETADDR_PREFIX = '192.168.43.'
+NETADDR_BROADCAST = '255.255.255.255'
+SUBNET_BROADCAST = NETADDR_PREFIX + '255'
+
+
+OFFER = 2
+REQUEST = 3
+ACK = 5
+NAK = 6
+
+
+pmc_base_cmd = (
+ "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ")
+start_pmc_cmd = (
+ "am start -S -n com.android.pmc/com.android.pmc.PMCMainActivity")
+pmc_start_usb_tethering_cmd = "%sStartUSBTethering" % pmc_base_cmd
+pmc_stop_usb_tethering_cmd = "%sStopUSBTethering" % pmc_base_cmd
+
+
+class DhcpServerTest(base_test.BaseTestClass):
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ self.USB_TETHERED = False
+ self.next_hwaddr_index = 0
+ self.stop_arp = Event()
+
+ conf.checkIPaddr = 0
+ conf.checkIPsrc = 0
+ # Allow using non-67 server ports as long as client uses 68
+ bind_layers(UDP, BOOTP, dport=CLIENT_PORT)
+
+ self.dut.adb.shell(start_pmc_cmd)
+ self.dut.adb.shell("setprop log.tag.PMC VERBOSE")
+ iflist_before = get_if_list()
+ self._start_usb_tethering(self.dut)
+ self.iface = self._wait_for_new_iface(iflist_before)
+ self.real_hwaddr = get_if_raw_hwaddr(self.iface)
+
+ # Start a thread to answer to all ARP "who-has"
+ thread = Thread(target=self._sniff_arp, args=(self.stop_arp,))
+ thread.start()
+
+ # Discover server IP and device hwaddr
+ hwaddr = self._next_hwaddr()
+ resp = self._get_response(self._make_discover(hwaddr))
+ asserts.assert_false(None == resp,
+ "Device did not reply to first DHCP discover")
+ self.server_addr = getopt(resp, 'server_id')
+ self.dut_hwaddr = resp.getlayer(Ether).src
+ asserts.assert_false(None == self.server_addr,
+ "DHCP server did not specify server identifier")
+ # Ensure that we don't depend on assigned route/gateway on the host
+ conf.route.add(host=self.server_addr, dev=self.iface, gw="0.0.0.0")
+
+ def setup_test(self):
+ # Some versions of scapy do not close the receive file properly
+ warnings.filterwarnings("ignore", category=ResourceWarning)
+
+ bind_layers(UDP, BOOTP, dport=68)
+ self.hwaddr = self._next_hwaddr()
+ self.other_hwaddr = self._next_hwaddr()
+ self.cleanup_releases = []
+
+ def teardown_test(self):
+ for packet in self.cleanup_releases:
+ self._send(packet)
+
+ def teardown_class(self):
+ self.stop_arp.set()
+ self._stop_usb_tethering(self.dut)
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+
+ def _start_usb_tethering(self, dut):
+ """ Start USB tethering
+
+ Args:
+ 1. dut - ad object
+ """
+ self.log.info("Starting USB Tethering")
+ dut.stop_services()
+ dut.adb.shell(pmc_start_usb_tethering_cmd)
+ self._wait_for_device(self.dut)
+ self.USB_TETHERED = True
+
+ def _stop_usb_tethering(self, dut):
+ """ Stop USB tethering
+
+ Args:
+ 1. dut - ad object
+ """
+ self.log.info("Stopping USB Tethering")
+ dut.adb.shell(pmc_stop_usb_tethering_cmd)
+ self._wait_for_device(self.dut)
+ dut.start_services(skip_sl4a=getattr(dut, "skip_sl4a", False))
+ self.USB_TETHERED = False
+
+ def _wait_for_device(self, dut):
+ """ Wait for device to come back online
+
+ Args:
+ 1. dut - ad object
+ """
+ while dut.serial not in android_device.list_adb_devices():
+ pass
+ dut.adb.wait_for_device()
+
+ def _wait_for_new_iface(self, old_ifaces):
+ old_set = set(old_ifaces)
+ # Try 10 times to find a new interface with a 1s sleep every time
+ # (equivalent to a 9s timeout)
+ for i in range(0, 10):
+ new_ifaces = set(get_if_list()) - old_set
+ asserts.assert_true(len(new_ifaces) < 2,
+ "Too many new interfaces after turning on tethering")
+ if len(new_ifaces) == 1:
+ return new_ifaces.pop()
+ time.sleep(1)
+ asserts.fail("Timeout waiting for tethering interface on host")
+
+ def _sniff_arp(self, stop_arp):
+ try:
+ sniff(iface=self.iface, filter='arp', prn=self._handle_arp, store=0,
+ stop_filter=lambda p: stop_arp.is_set())
+ except:
+ # sniff may raise when tethering is disconnected. Ignore
+ # exceptions if stop was requested.
+ if not stop_arp.is_set():
+ raise
+
+ def _handle_arp(self, packet):
+ # Reply to all arp "who-has": say we have everything
+ if packet[ARP].op == ARP.who_has:
+ reply = ARP(op=ARP.is_at, hwsrc=self.real_hwaddr, psrc=packet.pdst,
+ hwdst=BROADCAST_MAC, pdst=SUBNET_BROADCAST)
+ sendp(Ether(dst=BROADCAST_MAC, src=self.real_hwaddr) / reply,
+ iface=self.iface, verbose=False)
+
+ @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c")
+ def test_config_assumptions(self):
+ resp = self._get_response(self._make_discover(self.hwaddr))
+ asserts.assert_false(None == resp, "Device did not reply to discover")
+ asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX),
+ "Server does not use expected prefix")
+
+ @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568")
+ def test_discover_broadcastbit(self):
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, bcastbit=True))
+ self._assert_offer(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191")
+ def test_discover_bootpfields(self):
+ discover = self._make_discover(self.hwaddr)
+ resp = self._get_response(discover)
+ self._assert_offer(resp)
+ self._assert_unicast(resp)
+ bootp = assert_bootp_response(resp, discover)
+ asserts.assert_equal(INET4_ANY, bootp.ciaddr)
+ asserts.assert_equal(self.server_addr, bootp.siaddr)
+ asserts.assert_equal(INET4_ANY, bootp.giaddr)
+ asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
+
+ @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182")
+ def test_discover_relayed_broadcastbit(self):
+ giaddr = NETADDR_PREFIX + '123'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True))
+ self._assert_offer(resp)
+ self._assert_unicast(resp, giaddr)
+ self._assert_broadcastbit(resp)
+
+ def _run_discover_paramrequestlist(self, params, unwanted_params):
+ params_opt = make_paramrequestlist_opt(params)
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, options=[params_opt]))
+
+ self._assert_offer(resp)
+ # List of requested params in response order
+ resp_opts = get_opt_labels(resp)
+ resp_requested_opts = [opt for opt in resp_opts if opt in params]
+ # All above params should be supported, order should be conserved
+ asserts.assert_equal(params, resp_requested_opts)
+ asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params)))
+ return resp
+
+ @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8")
+ def test_discover_paramrequestlist(self):
+ resp = self._run_discover_paramrequestlist(
+ ['subnet_mask', 'broadcast_address', 'router', 'name_server'],
+ unwanted_params=[])
+ for opt in ['broadcast_address', 'router', 'name_server']:
+ asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX),
+ opt + ' does not start with ' + NETADDR_PREFIX)
+
+ subnet_mask = getopt(resp, 'subnet_mask')
+ asserts.assert_true(subnet_mask.startswith('255.255.'),
+ 'Unexpected subnet mask for /16+: ' + subnet_mask)
+
+ @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46")
+ def test_discover_paramrequestlist_rev(self):
+ # RFC2132 #9.8: "The DHCP server is not required to return the options
+ # in the requested order, but MUST try to insert the requested options
+ # in the order requested"
+ asserts.skip('legacy behavior not compliant: fixed order used')
+ self._run_discover_paramrequestlist(
+ ['name_server', 'router', 'broadcast_address', 'subnet_mask'],
+ unwanted_params=[])
+
+ @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b")
+ def test_discover_paramrequestlist_unwanted(self):
+ asserts.skip('legacy behavior always sends all parameters')
+ self._run_discover_paramrequestlist(['router', 'name_server'],
+ unwanted_params=['broadcast_address', 'subnet_mask'])
+
+ def _assert_renews(self, request, addr, exp_time, resp_type=ACK):
+ # Sleep to test lease time renewal
+ time.sleep(3)
+ resp = self._get_response(request)
+ self._assert_type(resp, resp_type)
+ asserts.assert_equal(addr, get_yiaddr(resp))
+ remaining_lease = getopt(resp, 'lease_time')
+ # Lease renewed: waited for 3s, lease time not decreased by more than 2
+ asserts.assert_true(remaining_lease >= exp_time - 2,
+ 'Lease not renewed')
+ # Lease times should be consistent across offers/renewals
+ asserts.assert_true(remaining_lease <= exp_time + 2,
+ 'Lease time inconsistent')
+ return resp
+
+ @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade")
+ def test_discover_assigned_ownaddress(self):
+ addr, siaddr, resp = self._request_address(self.hwaddr)
+
+ lease_time = getopt(resp, 'lease_time')
+ server_id = getopt(resp, 'server_id')
+ asserts.assert_true(lease_time >= 60, "Lease time is too short")
+ asserts.assert_false(addr == INET4_ANY, "Assigned address is empty")
+ # Wait to test lease expiration time change
+ time.sleep(3)
+
+ # New discover, same address
+ resp = self._assert_renews(self._make_discover(self.hwaddr),
+ addr, lease_time, resp_type=OFFER)
+ self._assert_unicast(resp, get_yiaddr(resp))
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587")
+ def test_discover_assigned_otherhost(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ # New discover, same address, different client
+ resp = self._get_response(self._make_discover(self.other_hwaddr,
+ [('requested_addr', addr)]))
+
+ self._assert_offer(resp)
+ asserts.assert_false(get_yiaddr(resp) == addr,
+ "Already assigned address offered")
+ self._assert_unicast(resp, get_yiaddr(resp))
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14")
+ def test_discover_requestaddress(self):
+ addr = NETADDR_PREFIX + '200'
+ resp = self._get_response(self._make_discover(self.hwaddr,
+ [('requested_addr', addr)]))
+ self._assert_offer(resp)
+ asserts.assert_equal(get_yiaddr(resp), addr)
+
+ # Lease not committed: can request again
+ resp = self._get_response(self._make_discover(self.other_hwaddr,
+ [('requested_addr', addr)]))
+ self._assert_offer(resp)
+ asserts.assert_equal(get_yiaddr(resp), addr)
+
+ @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd")
+ def test_discover_requestaddress_wrongsubnet(self):
+ addr = OTHER_NETADDR_PREFIX + '200'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, [('requested_addr', addr)]))
+ self._assert_offer(resp)
+ self._assert_unicast(resp)
+ asserts.assert_false(get_yiaddr(resp) == addr,
+ 'Server offered invalid address')
+
+ @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf")
+ def test_discover_giaddr_outside_subnet(self):
+ giaddr = OTHER_NETADDR_PREFIX + '201'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, giaddr=giaddr))
+ asserts.assert_equal(resp, None)
+
+ @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1")
+ def test_discover_srcaddr_outside_subnet(self):
+ srcaddr = OTHER_NETADDR_PREFIX + '200'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, ip_src=srcaddr))
+ self._assert_offer(resp)
+ asserts.assert_false(srcaddr == get_yiaddr(resp),
+ 'Server offered invalid address')
+
+ @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e")
+ def test_discover_requestaddress_giaddr_outside_subnet(self):
+ addr = NETADDR_PREFIX + '200'
+ giaddr = OTHER_NETADDR_PREFIX + '201'
+ req = self._make_discover(self.hwaddr, [('requested_addr', addr)],
+ ip_src=giaddr, giaddr=giaddr)
+ resp = self._get_response(req)
+ asserts.assert_equal(resp, None)
+
+ @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db")
+ def test_discover_knownaddress_giaddr_outside_subnet(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ # New discover, same client, through relay in invalid subnet
+ giaddr = OTHER_NETADDR_PREFIX + '200'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, giaddr=giaddr))
+ asserts.assert_equal(resp, None)
+
+ @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557")
+ def test_discover_knownaddress_giaddr_valid_subnet(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ # New discover, same client, through relay in valid subnet
+ giaddr = NETADDR_PREFIX + '200'
+ resp = self._get_response(
+ self._make_discover(self.hwaddr, giaddr=giaddr))
+ self._assert_offer(resp)
+ self._assert_unicast(resp, giaddr)
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7")
+ def test_request_unicast(self):
+ addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False)
+ self._assert_unicast(resp, addr)
+
+ @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3")
+ def test_request_bootpfields(self):
+ req_addr = NETADDR_PREFIX + '200'
+ req = self._make_request(self.hwaddr, req_addr, self.server_addr)
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ bootp = assert_bootp_response(resp, req)
+ asserts.assert_equal(INET4_ANY, bootp.ciaddr)
+ asserts.assert_equal(self.server_addr, bootp.siaddr)
+ asserts.assert_equal(INET4_ANY, bootp.giaddr)
+ asserts.assert_equal(self.hwaddr, get_chaddr(bootp))
+
+ @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18")
+ def test_request_selecting_inuse(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+ new_req = self._make_request(self.other_hwaddr, addr, siaddr)
+ resp = self._get_response(new_req)
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+ bootp = assert_bootp_response(resp, new_req)
+ asserts.assert_equal(INET4_ANY, bootp.ciaddr)
+ asserts.assert_equal(INET4_ANY, bootp.yiaddr)
+ asserts.assert_equal(INET4_ANY, bootp.siaddr)
+ asserts.assert_equal(INET4_ANY, bootp.giaddr)
+ asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp))
+ asserts.assert_equal(
+ ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt
+ get_opt_labels(bootp))
+ asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id'))
+
+ @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c")
+ def test_request_selecting_wrongsiaddr(self):
+ addr = NETADDR_PREFIX + '200'
+ wrong_siaddr = NETADDR_PREFIX + '201'
+ asserts.assert_false(wrong_siaddr == self.server_addr,
+ 'Test assumption not met: server addr is ' + wrong_siaddr)
+ resp = self._get_response(
+ self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr))
+ asserts.assert_true(resp == None,
+ 'Received response for request with incorrect siaddr')
+
+ @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f")
+ def test_request_selecting_giaddr_outside_subnet(self):
+ addr = NETADDR_PREFIX + '200'
+ giaddr = OTHER_NETADDR_PREFIX + '201'
+ resp = self._get_response(
+ self._make_request(self.hwaddr, addr, siaddr=self.server_addr,
+ giaddr=giaddr))
+ asserts.assert_equal(resp, None)
+
+ @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f")
+ def test_request_selecting_hostnameupdate(self):
+ addr = NETADDR_PREFIX + '123'
+ hostname1 = b'testhostname1'
+ hostname2 = b'testhostname2'
+ req = self._make_request(self.hwaddr, None, None,
+ options=[
+ ('requested_addr', addr),
+ ('server_id', self.server_addr),
+ ('hostname', hostname1)])
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_unicast(resp, addr)
+ asserts.assert_equal(hostname1, getopt(req, 'hostname'))
+
+ # Re-request with different hostname
+ setopt(req, 'hostname', hostname2)
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_unicast(resp, addr)
+ asserts.assert_equal(hostname2, getopt(req, 'hostname'))
+
+ def _run_initreboot(self, bcastbit):
+ addr, siaddr, resp = self._request_address(self.hwaddr)
+ exp = getopt(resp, 'lease_time')
+
+ # init-reboot: siaddr is None
+ return self._assert_renews(self._make_request(
+ self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp)
+
+ @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528")
+ def test_request_initreboot(self):
+ resp = self._run_initreboot(bcastbit=False)
+ self._assert_unicast(resp)
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51")
+ def test_request_initreboot_broadcastbit(self):
+ resp = self._run_initreboot(bcastbit=True)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c")
+ def test_request_initreboot_nolease(self):
+ # RFC2131 #4.3.2
+ asserts.skip("legacy behavior not compliant")
+ addr = NETADDR_PREFIX + '123'
+ resp = self._get_response(self._make_request(self.hwaddr, addr, None))
+ asserts.assert_equal(resp, None)
+
+ @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5")
+ def test_request_initreboot_incorrectlease(self):
+ otheraddr = NETADDR_PREFIX + '123'
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+ asserts.assert_false(addr == otheraddr,
+ "Test assumption not met: server assigned " + otheraddr)
+
+ resp = self._get_response(
+ self._make_request(self.hwaddr, otheraddr, siaddr=None))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120")
+ def test_request_initreboot_wrongnet(self):
+ resp = self._get_response(self._make_request(self.hwaddr,
+ OTHER_NETADDR_PREFIX + '1', siaddr=None))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ def _run_rebinding(self, bcastbit, giaddr=INET4_ANY):
+ addr, siaddr, resp = self._request_address(self.hwaddr)
+ exp = getopt(resp, 'lease_time')
+
+ # Rebinding: no siaddr or reqaddr
+ resp = self._assert_renews(
+ self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
+ ciaddr=addr, giaddr=giaddr, ip_src=addr,
+ ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit),
+ addr, exp)
+ return resp, addr
+
+ @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca")
+ def test_request_rebinding(self):
+ resp, addr = self._run_rebinding(bcastbit=False)
+ self._assert_unicast(resp, addr)
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2")
+ def test_request_rebinding_relayed(self):
+ giaddr = NETADDR_PREFIX + '123'
+ resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr)
+ self._assert_unicast(resp, giaddr)
+ self._assert_broadcastbit(resp, isset=False)
+
+ @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1")
+ def test_request_rebinding_inuse(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ resp = self._get_response(self._make_request(
+ self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc")
+ def test_request_rebinding_wrongaddr(self):
+ otheraddr = NETADDR_PREFIX + '123'
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+ asserts.assert_false(addr == otheraddr,
+ "Test assumption not met: server assigned " + otheraddr)
+
+ resp = self._get_response(self._make_request(
+ self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="421a86b3-8779-4910-8050-7806536efabb")
+ def test_request_rebinding_wrongaddr_relayed(self):
+ otheraddr = NETADDR_PREFIX + '123'
+ relayaddr = NETADDR_PREFIX + '124'
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+ asserts.assert_false(addr == otheraddr,
+ "Test assumption not met: server assigned " + otheraddr)
+ asserts.assert_false(addr == relayaddr,
+ "Test assumption not met: server assigned " + relayaddr)
+
+ req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None,
+ ciaddr=otheraddr, giaddr=relayaddr)
+
+ resp = self._get_response(req)
+ self._assert_nak(resp)
+ self._assert_unicast(resp, relayaddr)
+ self._assert_broadcastbit(resp)
+
+ @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da")
+ def test_release(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+ # Re-requesting fails
+ resp = self._get_response(
+ self._make_request(self.other_hwaddr, addr, siaddr))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ # Succeeds after release
+ self._send(self._make_release(self.hwaddr, addr, siaddr))
+ resp = self._get_response(
+ self._make_request(self.other_hwaddr, addr, siaddr))
+ self._assert_ack(resp)
+
+ @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593")
+ def test_release_noserverid(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ # Release without server_id opt is ignored
+ release = self._make_release(self.hwaddr, addr, siaddr)
+ removeopt(release, 'server_id')
+ self._send(release)
+
+ # Not released: request fails
+ resp = self._get_response(
+ self._make_request(self.other_hwaddr, addr, siaddr))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1")
+ def test_release_wrongserverid(self):
+ addr, siaddr, _ = self._request_address(self.hwaddr)
+
+ # Release with wrong server id
+ release = self._make_release(self.hwaddr, addr, siaddr)
+ setopt(release, 'server_id', addr)
+ self._send(release)
+
+ # Not released: request fails
+ resp = self._get_response(
+ self._make_request(self.other_hwaddr, addr, siaddr))
+ self._assert_nak(resp)
+ self._assert_broadcast(resp)
+
+ @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce")
+ def test_unicast_l2l3(self):
+ reqAddr = NETADDR_PREFIX + '124'
+ resp = self._get_response(self._make_request(
+ self.hwaddr, reqAddr, siaddr=None))
+ self._assert_unicast(resp)
+ str_hwaddr = format_hwaddr(self.hwaddr)
+ asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst)
+ asserts.assert_equal(reqAddr, resp.getlayer(IP).dst)
+ asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport)
+
+ @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
+ def test_macos_10_13_3_discover(self):
+ params_opt = make_paramrequestlist_opt([
+ 'subnet_mask',
+ 121, # Classless Static Route
+ 'router',
+ 'name_server',
+ 'domain',
+ 119, # Domain Search
+ 252, # Private/Proxy autodiscovery
+ 95, # LDAP
+ 'NetBIOS_server',
+ 46, # NetBIOS over TCP/IP Node Type
+ ])
+ req = self._make_discover(self.hwaddr,
+ options=[
+ params_opt,
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('lease_time', 7776000),
+ ('hostname', b'test12-macbookpro'),
+ ], opts_padding=bytes(6))
+ req.getlayer(BOOTP).secs = 2
+ resp = self._get_response(req)
+ self._assert_standard_offer(resp)
+
+ def _make_macos_10_13_3_paramrequestlist(self):
+ return make_paramrequestlist_opt([
+ 'subnet_mask',
+ 121, # Classless Static Route
+ 'router',
+ 'name_server',
+ 'domain',
+ 119, # Domain Search
+ 252, # Private/Proxy autodiscovery
+ 95, # LDAP
+ 44, # NetBIOS over TCP/IP Name Server
+ 46, # NetBIOS over TCP/IP Node Type
+ ])
+
+ @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798")
+ def test_macos_10_13_3_discover(self):
+ req = self._make_discover(self.hwaddr,
+ options=[
+ self._make_macos_10_13_3_paramrequestlist(),
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('lease_time', 7776000),
+ ('hostname', b'test12-macbookpro'),
+ ], opts_padding=bytes(6))
+ req.getlayer(BOOTP).secs = 2
+ resp = self._get_response(req)
+ self._assert_offer(resp)
+ self._assert_standard_offer_or_ack(resp)
+
+ @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86")
+ def test_macos_10_13_3_request_selecting(self):
+ req = self._make_request(self.hwaddr, None, None,
+ options=[
+ self._make_macos_10_13_3_paramrequestlist(),
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('requested_addr', NETADDR_PREFIX + '109'),
+ ('server_id', self.server_addr),
+ ('hostname', b'test12-macbookpro'),
+ ])
+ req.getlayer(BOOTP).secs = 5
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp)
+
+ # Note: macOS does not seem to do any rebinding (straight to discover)
+ @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b")
+ def test_macos_10_13_3_request_renewing(self):
+ req_ip = NETADDR_PREFIX + '109'
+ req = self._make_request(self.hwaddr, None, None,
+ ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[
+ self._make_macos_10_13_3_paramrequestlist(),
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('lease_time', 7776000),
+ ('hostname', b'test12-macbookpro'),
+ ], opts_padding=bytes(6))
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp, renewing=True)
+
+ def _make_win10_paramrequestlist(self):
+ return make_paramrequestlist_opt([
+ 'subnet_mask',
+ 'router',
+ 'name_server',
+ 'domain',
+ 31, # Perform Router Discover
+ 33, # Static Route
+ 'vendor_specific',
+ 44, # NetBIOS over TCP/IP Name Server
+ 46, # NetBIOS over TCP/IP Node Type
+ 47, # NetBIOS over TCP/IP Scope
+ 121, # Classless Static Route
+ 249, # Private/Classless Static Route (MS)
+ 252, # Private/Proxy autodiscovery
+ ])
+
+ @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76")
+ def test_win10_discover(self):
+ req = self._make_discover(self.hwaddr, bcastbit=True,
+ options=[
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('hostname', b'test120-w'),
+ ('vendor_class_id', b'MSFT 5.0'),
+ self._make_win10_paramrequestlist(),
+ ], opts_padding=bytes(11))
+ req.getlayer(BOOTP).secs = 2
+ resp = self._get_response(req)
+ self._assert_offer(resp)
+ self._assert_standard_offer_or_ack(resp, bcast=True)
+
+ @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410")
+ def test_win10_request_selecting(self):
+ req = self._make_request(self.hwaddr, None, None, bcastbit=True,
+ options=[
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('requested_addr', NETADDR_PREFIX + '109'),
+ ('server_id', self.server_addr),
+ ('hostname', b'test120-w'),
+ # Client Fully Qualified Domain Name
+ (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
+ ('vendor_class_id', b'MSFT 5.0'),
+ self._make_win10_paramrequestlist(),
+ ])
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp, bcast=True)
+
+ def _run_win10_request_renewing(self, bcast):
+ req_ip = NETADDR_PREFIX + '109'
+ req = self._make_request(self.hwaddr, None, None, bcastbit=bcast,
+ ciaddr=req_ip, ip_src=req_ip,
+ ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
+ options=[
+ ('max_dhcp_size', 1500),
+ # HW type Ethernet (0x01)
+ ('client_id', b'\x01' + self.hwaddr),
+ ('hostname', b'test120-w'),
+ # Client Fully Qualified Domain Name
+ (81, b'\x00\x00\x00test120-w.ad.tst.example.com'),
+ ('vendor_class_id', b'MSFT 5.0'),
+ self._make_win10_paramrequestlist(),
+ ])
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast)
+
+ @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9")
+ def test_win10_request_renewing(self):
+ self._run_win10_request_renewing(bcast=False)
+
+ @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751")
+ def test_win10_request_rebinding(self):
+ self._run_win10_request_renewing(bcast=True)
+
+ def _make_debian_paramrequestlist(self):
+ return make_paramrequestlist_opt([
+ 'subnet_mask',
+ 'broadcast_address',
+ 'router',
+ 'name_server',
+ 119, # Domain Search
+ 'hostname',
+ 101, # TCode
+ 'domain', # NetBIOS over TCP/IP Name Server
+ 'vendor_specific', # NetBIOS over TCP/IP Node Type
+ 121, # Classless Static Route
+ 249, # Private/Classless Static Route (MS)
+ 33, # Static Route
+ 252, # Private/Proxy autodiscovery
+ 'NTP_server',
+ ])
+
+ @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5")
+ def test_debian_dhclient_4_3_5_discover(self):
+ req_ip = NETADDR_PREFIX + '109'
+ req = self._make_discover(self.hwaddr,
+ options=[
+ ('requested_addr', req_ip),
+ ('hostname', b'test12'),
+ self._make_debian_paramrequestlist(),
+ ], opts_padding=bytes(26))
+ resp = self._get_response(req)
+ self._assert_offer(resp)
+ self._assert_standard_offer_or_ack(resp)
+ asserts.assert_equal(req_ip, get_yiaddr(resp))
+
+ @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac")
+ def test_debian_dhclient_4_3_5_request_selecting(self):
+ req = self._make_request(self.hwaddr, None, None,
+ options=[
+ ('server_id', self.server_addr),
+ ('requested_addr', NETADDR_PREFIX + '109'),
+ ('hostname', b'test12'),
+ self._make_debian_paramrequestlist(),
+ ], opts_padding=bytes(20))
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp, with_hostname=True)
+
+ def _run_debian_renewing(self, bcast):
+ req_ip = NETADDR_PREFIX + '109'
+ req = self._make_request(self.hwaddr, None, None,
+ ciaddr=req_ip, ip_src=req_ip,
+ ip_dst=NETADDR_BROADCAST if bcast else self.server_addr,
+ options=[
+ ('hostname', b'test12'),
+ self._make_debian_paramrequestlist(),
+ ],
+ opts_padding=bytes(32))
+ resp = self._get_response(req)
+ self._assert_ack(resp)
+ self._assert_standard_offer_or_ack(resp, renewing=True,
+ with_hostname=True)
+
+ @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc")
+ def test_debian_dhclient_4_3_5_request_renewing(self):
+ self._run_debian_renewing(bcast=False)
+
+ @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db")
+ def test_debian_dhclient_4_3_5_request_rebinding(self):
+ self._run_debian_renewing(bcast=True)
+
+ def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False,
+ with_hostname=False):
+ # Responses to renew/rebind are always unicast to ciaddr even with
+ # broadcast flag set (RFC does not define this behavior, but this is
+ # more efficient and matches previous behavior)
+ if bcast and not renewing:
+ self._assert_broadcast(resp)
+ else:
+ self._assert_unicast(resp)
+ self._assert_broadcastbit(resp, isset=bcast)
+
+ bootp_resp = resp.getlayer(BOOTP)
+ asserts.assert_equal(0, bootp_resp.hops)
+ if renewing:
+ asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX),
+ 'ciaddr does not start with expected prefix')
+ else:
+ asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr)
+ asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX),
+ 'yiaddr does not start with expected prefix')
+ asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX),
+ 'siaddr does not start with expected prefix')
+ asserts.assert_equal(INET4_ANY, bootp_resp.giaddr)
+
+ opt_labels = get_opt_labels(bootp_resp)
+ # FQDN option 81 is not supported in new behavior
+ opt_labels = [opt for opt in opt_labels if opt != 81]
+
+ # Expect exactly these options in this order
+ expected_opts = [
+ 'message-type', 'server_id', 'lease_time', 'renewal_time',
+ 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router',
+ 'name_server']
+ if with_hostname:
+ expected_opts.append('hostname')
+ expected_opts.extend(['vendor_specific', 'end'])
+ asserts.assert_equal(expected_opts, opt_labels)
+
+ def _request_address(self, hwaddr, bcast=True):
+ resp = self._get_response(self._make_discover(hwaddr))
+ self._assert_offer(resp)
+ addr = get_yiaddr(resp)
+ siaddr = getopt(resp, 'server_id')
+ resp = self._get_response(self._make_request(hwaddr, addr, siaddr,
+ ip_dst=(INET4_ANY if bcast else siaddr)))
+ self._assert_ack(resp)
+ return addr, siaddr, resp
+
+ def _get_response(self, packet):
+ resp = srp1(packet, iface=self.iface, timeout=10, verbose=False)
+ bootp_resp = (resp or None) and resp.getlayer(BOOTP)
+ if bootp_resp != None and get_mess_type(bootp_resp) == ACK:
+ # Note down corresponding release for this request
+ release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr,
+ getopt(bootp_resp, 'server_id'))
+ self.cleanup_releases.append(release)
+ return resp
+
+ def _send(self, packet):
+ sendp(packet, iface=self.iface, verbose=False)
+
+ def _assert_type(self, packet, tp):
+ asserts.assert_false(None == packet, "No packet")
+ asserts.assert_equal(tp, get_mess_type(packet))
+
+ def _assert_ack(self, packet):
+ self._assert_type(packet, ACK)
+
+ def _assert_nak(self, packet):
+ self._assert_type(packet, NAK)
+
+ def _assert_offer(self, packet):
+ self._assert_type(packet, OFFER)
+
+ def _assert_broadcast(self, packet):
+ asserts.assert_false(None == packet, "No packet")
+ asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC)
+ asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST)
+ self._assert_broadcastbit(packet)
+
+ def _assert_broadcastbit(self, packet, isset=True):
+ mask = 0x8000
+ flag = packet.getlayer(BOOTP).flags
+ asserts.assert_equal(flag & mask, mask if isset else 0)
+
+ def _assert_unicast(self, packet, ipAddr=None):
+ asserts.assert_false(None == packet, "No packet")
+ asserts.assert_false(packet.getlayer(Ether).dst == BROADCAST_MAC,
+ "Layer 2 packet destination address was broadcast")
+ if ipAddr:
+ asserts.assert_equal(packet.getlayer(IP).dst, ipAddr)
+
+ def _next_hwaddr(self):
+ addr = make_hwaddr(self.next_hwaddr_index)
+ self.next_hwaddr_index = self.next_hwaddr_index + 1
+ return addr
+
+ def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY,
+ ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY,
+ bcastbit=False):
+ broadcast = (ip_dst == NETADDR_BROADCAST)
+ ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr))
+ ip = IP(src=ip_src, dst=ip_dst)
+ udp = UDP(sport=68, dport=SERVER_PORT)
+ bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr,
+ flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32))
+ dhcp = DHCP(options=options)
+ return ethernet / ip / udp / bootp / dhcp
+
+ def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY,
+ bcastbit=False, opts_padding=None, ip_src=INET4_ANY):
+ opts = [('message-type','discover')]
+ opts.extend(options)
+ opts.append('end')
+ if (opts_padding):
+ opts.append(opts_padding)
+ return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr,
+ ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src)
+
+ def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY,
+ ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False,
+ options=[], opts_padding=None):
+ if not ip_dst:
+ ip_dst = siaddr or INET4_ANY
+
+ if not ip_src and ip_dst == INET4_ANY:
+ ip_src = INET4_ANY
+ elif not ip_src:
+ ip_src = (giaddr if not isempty(giaddr)
+ else ciaddr if not isempty(ciaddr)
+ else reqaddr)
+ # Kernel will not receive unicast UDP packets with empty ip_src
+ asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src),
+ "Unicast ip_src cannot be zero")
+ opts = [('message-type', 'request')]
+ if options:
+ opts.extend(options)
+ else:
+ if siaddr:
+ opts.append(('server_id', siaddr))
+ if reqaddr:
+ opts.append(('requested_addr', reqaddr))
+ opts.append('end')
+ if opts_padding:
+ opts.append(opts_padding)
+ return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr,
+ ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit)
+
+ def _make_release(self, src_hwaddr, addr, server_id):
+ opts = [('message-type', 'release'), ('server_id', server_id), 'end']
+ return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr,
+ ip_dst=server_id)
+
+def assert_bootp_response(resp, req):
+ bootp = resp.getlayer(BOOTP)
+ asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op')
+ asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype')
+ asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen')
+ asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops')
+ asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID')
+ return bootp
+
+
+def make_paramrequestlist_opt(params):
+ param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt
+ for opt in params]
+ return tuple(['param_req_list'] + [
+ opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt
+ for opt in param_indexes])
+
+
+def isempty(addr):
+ return not addr or addr == INET4_ANY
+
+
+def setopt(packet, optname, val):
+ dhcp = packet.getlayer(DHCP)
+ if optname in get_opt_labels(dhcp):
+ dhcp.options = [(optname, val) if opt[0] == optname else opt
+ for opt in dhcp.options]
+ else:
+ # Add before the last option (last option is "end")
+ dhcp.options.insert(len(dhcp.options) - 1, (optname, val))
+
+
+def getopt(packet, key):
+ opts = [opt[1] for opt in packet.getlayer(DHCP).options if opt[0] == key]
+ return opts[0] if opts else None
+
+
+def removeopt(packet, key):
+ dhcp = packet.getlayer(DHCP)
+ dhcp.options = [opt for opt in dhcp.options if opt[0] != key]
+
+
+def get_opt_labels(packet):
+ dhcp_resp = packet.getlayer(DHCP)
+ # end option is a single string, not a tuple.
+ return [opt if isinstance(opt, str) else opt[0]
+ for opt in dhcp_resp.options if opt != 'pad']
+
+
+def get_yiaddr(packet):
+ return packet.getlayer(BOOTP).yiaddr
+
+
+def get_chaddr(packet):
+ # We use Ethernet addresses. Ignore address padding
+ return packet.getlayer(BOOTP).chaddr[:6]
+
+
+def get_mess_type(packet):
+ return getopt(packet, 'message-type')
+
+
+def make_hwaddr(index):
+ if index > 0xffff:
+ raise ValueError("Address index out of range")
+ return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff])
+
+
+def format_hwaddr(addr):
+ return ':'.join(['%02x' % c for c in addr])
diff --git a/acts/tests/google/net/DnsOverTlsTest.py b/acts/tests/google/net/DnsOverTlsTest.py
index 46a6c01c6f..8d9fbed7ce 100644
--- a/acts/tests/google/net/DnsOverTlsTest.py
+++ b/acts/tests/google/net/DnsOverTlsTest.py
@@ -25,21 +25,22 @@ from acts import base_test
from acts import test_runner
from acts.controllers import adb
from acts.test_decorators import test_tracker_info
-from acts.test_utils.tel.tel_data_utils import wait_for_cell_data_connection
-from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump
-from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump
-from acts.test_utils.tel.tel_test_utils import verify_http_connection
+from acts.test_utils.net import net_test_utils as nutils
+from acts.test_utils.net.net_test_utils import start_tcpdump
+from acts.test_utils.net.net_test_utils import stop_tcpdump
from acts.test_utils.wifi import wifi_test_utils as wutils
from scapy.all import TCP
from scapy.all import UDP
from scapy.all import rdpcap
+from scapy.all import Scapy_Exception
DNS_QUAD9 = "dns.quad9.net"
PRIVATE_DNS_MODE_OFF = "off"
PRIVATE_DNS_MODE_OPPORTUNISTIC = "opportunistic"
PRIVATE_DNS_MODE_STRICT = "hostname"
RST = 0x04
+WLAN = "wlan0"
class DnsOverTlsTest(base_test.BaseTestClass):
""" Tests for Wifi Tethering """
@@ -48,17 +49,14 @@ class DnsOverTlsTest(base_test.BaseTestClass):
""" Setup devices for tethering and unpack params """
self.dut = self.android_devices[0]
- wutils.reset_wifi(self.dut)
- self.dut.droid.telephonyToggleDataConnection(True)
- wait_for_cell_data_connection(self.log, self.dut, True)
- asserts.assert_true(
- verify_http_connection(self.log, self.dut),
- "HTTP verification failed on cell data connection")
+ nutils.verify_lte_data_and_tethering_supported(self.dut)
req_params = ("wifi_network_with_dns_tls", "wifi_network_no_dns_tls",
"ping_hosts")
self.unpack_userparams(req_params)
self.tcpdump_pid = None
- self.tcpdump_file = None
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -68,9 +66,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
Args:
1. ad: dut to run tcpdump on
"""
- if self.tcpdump_pid:
- stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=False)
- self.tcpdump_pid = start_adb_tcpdump(ad, self.test_name, mask='all')
+ self.tcpdump_pid = start_tcpdump(ad, self.test_name)
def _stop_tcp_dump(self, ad):
""" Stop tcpdump and pull it to the test run logs
@@ -78,13 +74,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
Args:
1. ad: dut to pull tcpdump from
"""
- file_name = ad.adb.shell("ls /sdcard/tcpdump")
- file_name = os.path.join(ad.log_path, "TCPDUMP_%s" % ad.serial,
- file_name.split('/')[-1])
- if self.tcpdump_pid:
- stop_adb_tcpdump(ad, self.tcpdump_pid, pull_tcpdump=True)
- self.tcpdump_pid = None
- return os.path.join(ad.log_path, file_name)
+ return stop_tcpdump(ad, self.tcpdump_pid, self.test_name)
def _verify_dns_queries_over_tls(self, pcap_file, tls=True):
""" Verify if DNS queries were over TLS or not
@@ -136,6 +126,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
mode = self.dut.droid.getPrivateDnsMode()
asserts.assert_true(mode == dns_mode,
"Failed to set private DNS mode to %s" % dns_mode)
+ time.sleep(2)
# ping hosts should pass
for host in self.ping_hosts:
@@ -296,8 +287,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
# DNS server in link properties for wifi network
link_prop = self.dut.droid.connectivityGetActiveLinkProperties()
- dns_servers = link_prop['DnsServers']
- wifi_dns_servers = [each for lst in dns_servers for each in lst]
+ wifi_dns_servers = link_prop['PrivateDnsServerName']
self.log.info("Link prop: %s" % wifi_dns_servers)
# DUT is on LTE data
@@ -308,8 +298,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
# DNS server in link properties for cell network
link_prop = self.dut.droid.connectivityGetActiveLinkProperties()
- dns_servers = link_prop['DnsServers']
- lte_dns_servers = [each for lst in dns_servers for each in lst]
+ lte_dns_servers = link_prop['PrivateDnsServerName']
self.log.info("Link prop: %s" % lte_dns_servers)
# stop tcpdump on device
@@ -323,7 +312,7 @@ class DnsOverTlsTest(base_test.BaseTestClass):
"Hostname not in link properites - cell network")
@test_tracker_info(uuid="525a6f2d-9751-474e-a004-52441091e427")
- def test_dns_over_tls_no_reset_packets(self):
+ def dns_over_tls_no_reset_packets(self):
""" Verify there are no TCP packets with RST flags
Steps:
diff --git a/acts/tests/google/net/IpSecTest.py b/acts/tests/google/net/IpSecTest.py
index dc7fd42c89..41ae0bc978 100644
--- a/acts/tests/google/net/IpSecTest.py
+++ b/acts/tests/google/net/IpSecTest.py
@@ -20,8 +20,8 @@ from acts.test_decorators import test_tracker_info
from acts.test_utils.net import connectivity_const as cconst
from acts.test_utils.net import ipsec_test_utils as iutils
from acts.test_utils.net import socket_test_utils as sutils
-from acts.test_utils.tel.tel_test_utils import start_adb_tcpdump
-from acts.test_utils.tel.tel_test_utils import stop_adb_tcpdump
+from acts.test_utils.net.net_test_utils import start_tcpdump
+from acts.test_utils.net.net_test_utils import stop_tcpdump
from acts.test_utils.wifi import wifi_test_utils as wutils
import random
@@ -47,32 +47,22 @@ class IpSecTest(base_test.BaseTestClass):
self.ipv6_dut_a = self.dut_a.droid.connectivityGetIPv6Addresses(WLAN)[0]
self.ipv6_dut_b = self.dut_b.droid.connectivityGetIPv6Addresses(WLAN)[0]
+ self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo()
+
self.tcpdump_pid_a = None
- self.tcpdump_file_a = None
self.tcpdump_pid_b = None
- self.tcpdump_file_b = None
-
- self.crypt_auth_combos = iutils.generate_random_crypt_auth_combo()
def teardown_class(self):
for ad in self.android_devices:
wutils.reset_wifi(ad)
def setup_test(self):
- self.tcpdump_pid_a = start_adb_tcpdump(
- self.dut_a, self.test_name, mask='all')
- self.tcpdump_pid_b = start_adb_tcpdump(
- self.dut_b, self.test_name, mask='all')
+ self.tcpdump_pid_a = start_tcpdump(self.dut_a, self.test_name)
+ self.tcpdump_pid_b = start_tcpdump(self.dut_b, self.test_name)
def teardown_test(self):
- if self.tcpdump_pid_a:
- stop_adb_tcpdump(
- self.dut_a, self.tcpdump_pid_a, pull_tcpdump=True)
- if self.tcpdump_pid_b:
- stop_adb_tcpdump(
- self.dut_b, self.tcpdump_pid_b, pull_tcpdump=True)
- self.tcpdump_pid_a = None
- self.tcpdump_pid_b = None
+ stop_tcpdump(self.dut_a, self.tcpdump_pid_a, self.test_name)
+ stop_tcpdump(self.dut_b, self.tcpdump_pid_b, self.test_name)
def on_fail(self, test_name, begin_time):
self.dut_a.take_bug_report(test_name, begin_time)
diff --git a/acts/tests/google/net/LegacyVpnTest.py b/acts/tests/google/net/LegacyVpnTest.py
index 786a40a822..d1dd385cce 100644
--- a/acts/tests/google/net/LegacyVpnTest.py
+++ b/acts/tests/google/net/LegacyVpnTest.py
@@ -25,7 +25,8 @@ from acts import base_test
from acts import test_runner
from acts.controllers import adb
from acts.test_decorators import test_tracker_info
-from acts.test_utils.wifi import wifi_test_utils
+from acts.test_utils.net import net_test_utils as nutils
+from acts.test_utils.wifi import wifi_test_utils as wutils
from acts.test_utils.net import connectivity_const
VPN_CONST = connectivity_const.VpnProfile
@@ -48,140 +49,36 @@ class LegacyVpnTest(base_test.BaseTestClass):
required_params = dir(VPN_PARAMS)
required_params = [x for x in required_params if not x.startswith('__')]
self.unpack_userparams(required_params)
- wifi_test_utils.wifi_test_device_init(self.dut)
- wifi_test_utils.wifi_connect(self.dut, self.wifi_network)
+ wutils.wifi_test_device_init(self.dut)
+ wutils.wifi_connect(self.dut, self.wifi_network)
time.sleep(3)
+ self.vpn_params = {'vpn_username': self.vpn_username,
+ 'vpn_password': self.vpn_password,
+ 'psk_secret': self.psk_secret,
+ 'client_pkcs_file_name': self.client_pkcs_file_name,
+ 'cert_path_vpnserver': self.cert_path_vpnserver,
+ 'cert_password': self.cert_password}
+
def teardown_class(self):
""" Reset wifi to make sure VPN tears down cleanly
"""
- wifi_test_utils.reset_wifi(self.dut)
+ wutils.reset_wifi(self.dut)
def on_fail(self, test_name, begin_time):
self.dut.take_bug_report(test_name, begin_time)
- def download_load_certs(self, vpn_type, vpn_server_addr, ipsec_server_type):
- """ Download the certificates from VPN server and push to sdcard of DUT
-
- Args:
- VPN type name
-
- Returns:
- Client cert file name on DUT's sdcard
- """
- url = "http://%s%s%s" % (vpn_server_addr,
- self.cert_path_vpnserver,
- self.client_pkcs_file_name)
- local_cert_name = "%s_%s_%s" % (vpn_type.name,
- ipsec_server_type,
- self.client_pkcs_file_name)
- local_file_path = os.path.join(self.log_path, local_cert_name)
- try:
- ret = urllib.request.urlopen(url)
- with open(local_file_path, "wb") as f:
- f.write(ret.read())
- except:
- asserts.fail("Unable to download certificate from the server")
- f.close()
- self.dut.adb.push("%s sdcard/" % local_file_path)
- return local_cert_name
-
- def generate_legacy_vpn_profile(self, vpn_type, vpn_server_addr, ipsec_server_type):
- """ Generate legacy VPN profile for a VPN
-
- Args:
- VpnProfileType
- """
- vpn_profile = {VPN_CONST.USER: self.vpn_username,
- VPN_CONST.PWD: self.vpn_password,
- VPN_CONST.TYPE: vpn_type.value,
- VPN_CONST.SERVER: vpn_server_addr,}
- vpn_profile[VPN_CONST.NAME] = "test_%s_%s" % (vpn_type.name,ipsec_server_type)
- if vpn_type.name == "PPTP":
- vpn_profile[VPN_CONST.NAME] = "test_%s" % vpn_type.name
- psk_set = set(["L2TP_IPSEC_PSK", "IPSEC_XAUTH_PSK"])
- rsa_set = set(["L2TP_IPSEC_RSA", "IPSEC_XAUTH_RSA", "IPSEC_HYBRID_RSA"])
- if vpn_type.name in psk_set:
- vpn_profile[VPN_CONST.IPSEC_SECRET] = self.psk_secret
- elif vpn_type.name in rsa_set:
- cert_name = self.download_load_certs(vpn_type,
- vpn_server_addr,
- ipsec_server_type)
- vpn_profile[VPN_CONST.IPSEC_USER_CERT] = cert_name.split('.')[0]
- vpn_profile[VPN_CONST.IPSEC_CA_CERT] = cert_name.split('.')[0]
- self.dut.droid.installCertificate(vpn_profile,cert_name,self.cert_password)
- else:
- vpn_profile[VPN_CONST.MPPE] = self.pptp_mppe
- return vpn_profile
-
- def verify_ping_to_vpn_ip(self, connected_vpn_info):
- """ Verify if IP behind VPN server is pingable
- Ping should pass, if VPN is connected
- Ping should fail, if VPN is disconnected
-
- Args:
- connected_vpn_info which specifies the VPN connection status
- """
- ping_result = None
- pkt_loss = "100% packet loss"
- try:
- ping_result = self.dut.adb.shell("ping -c 3 -W 2 %s"
- % self.vpn_verify_address)
- except adb.AdbError:
- pass
- return ping_result and pkt_loss not in ping_result
-
- def legacy_vpn_connection_test_logic(self, vpn_profile):
- """ Test logic for each legacy VPN connection
-
- Steps:
- 1. Generate profile for the VPN type
- 2. Establish connection to the server
- 3. Verify that connection is established using LegacyVpnInfo
- 4. Verify the connection by pinging the IP behind VPN
- 5. Stop the VPN connection
- 6. Check the connection status
- 7. Verify that ping to IP behind VPN fails
-
- Args:
- VpnProfileType (1 of the 6 types supported by Android)
- """
- # Wait for sometime so that VPN server flushes all interfaces and
- # connections after graceful termination
- time.sleep(10)
- self.dut.adb.shell("ip xfrm state flush")
- logging.info("Connecting to: %s", vpn_profile)
- self.dut.droid.vpnStartLegacyVpn(vpn_profile)
- time.sleep(connectivity_const.VPN_TIMEOUT)
- connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo()
- asserts.assert_equal(connected_vpn_info["state"],
- connectivity_const.VPN_STATE_CONNECTED,
- "Unable to establish VPN connection for %s"
- % vpn_profile)
- ping_result = self.verify_ping_to_vpn_ip(connected_vpn_info)
- ip_xfrm_state = self.dut.adb.shell("ip xfrm state")
- match_obj = re.search(r'hmac(.*)', "%s" % ip_xfrm_state)
- if match_obj:
- ip_xfrm_state = format(match_obj.group(0)).split()
- self.log.info("HMAC for ESP is %s " % ip_xfrm_state[0])
- self.dut.droid.vpnStopLegacyVpn()
- asserts.assert_true(ping_result,
- "Ping to the internal IP failed. "
- "Expected to pass as VPN is connected")
- connected_vpn_info = self.dut.droid.vpnGetLegacyVpnInfo()
- asserts.assert_true(not connected_vpn_info,
- "Unable to terminate VPN connection for %s"
- % vpn_profile)
-
""" Test Cases """
@test_tracker_info(uuid="d2ac5a65-41fb-48de-a0a9-37e589b5456b")
def test_legacy_vpn_pptp(self):
""" Verify PPTP VPN connection """
vpn = VPN_TYPE.PPTP
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[2])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[2],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="99af78dd-40b8-483a-8344-cd8f67594971")
def legacy_vpn_l2tp_ipsec_psk_libreswan(self):
@@ -189,10 +86,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
libreSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][2],
- self.ipsec_server_type[2])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[2],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="e67d8c38-92c3-4167-8b6c-a49ef939adce")
def legacy_vpn_l2tp_ipsec_rsa_libreswan(self):
@@ -200,10 +99,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
libreSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][2],
- self.ipsec_server_type[2])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[2],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="8b3517dc-6a3b-44c2-a85d-bd7b969df3cf")
def legacy_vpn_ipsec_xauth_psk_libreswan(self):
@@ -211,10 +112,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
libreSwan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][2],
- self.ipsec_server_type[2])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[2],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="abac663d-1d91-4b87-8e94-11c6e44fb07b")
def legacy_vpn_ipsec_xauth_rsa_libreswan(self):
@@ -222,10 +125,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
libreSwan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][2],
- self.ipsec_server_type[2])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[2],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="84140d24-53c0-4f6c-866f-9d66e04442cc")
def test_legacy_vpn_l2tp_ipsec_psk_openswan(self):
@@ -233,10 +138,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
openSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][1],
- self.ipsec_server_type[1])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[1],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="f7087592-7eed-465d-bfe3-ed7b6d9d5f9a")
def test_legacy_vpn_l2tp_ipsec_rsa_openswan(self):
@@ -244,10 +151,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
openSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][1],
- self.ipsec_server_type[1])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[1],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="ed78973b-13ee-4dd4-b998-693ab741c6f8")
def test_legacy_vpn_ipsec_xauth_psk_openswan(self):
@@ -255,10 +164,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
openSwan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][1],
- self.ipsec_server_type[1])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[1],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="cfd125c4-b64c-4c49-b8e4-fbf05a9be8ec")
def test_legacy_vpn_ipsec_xauth_rsa_openswan(self):
@@ -266,10 +177,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
openSwan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][1],
- self.ipsec_server_type[1])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[1],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="419370de-0aa1-4a56-8c22-21567fa1cbb7")
def test_legacy_vpn_l2tp_ipsec_psk_strongswan(self):
@@ -277,10 +190,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
strongSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[0])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[0],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="f7694081-8bd6-4e31-86ec-d538c4ff1f2e")
def test_legacy_vpn_l2tp_ipsec_rsa_strongswan(self):
@@ -288,10 +203,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
strongSwan server
"""
vpn = VPN_TYPE.L2TP_IPSEC_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[0])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[0],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="2f86eb98-1e05-42cb-b6a6-fd90789b6cde")
def test_legacy_vpn_ipsec_xauth_psk_strongswan(self):
@@ -299,10 +216,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
strongSwan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_PSK
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[0])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[0],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="af0cd7b1-e86c-4327-91b4-e9062758f2cf")
def test_legacy_vpn_ipsec_xauth_rsa_strongswan(self):
@@ -310,10 +229,12 @@ class LegacyVpnTest(base_test.BaseTestClass):
strongswan server
"""
vpn = VPN_TYPE.IPSEC_XAUTH_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[0])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[0],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
@test_tracker_info(uuid="7b970d0a-1c7d-4a5a-b406-4815e190ef26")
def test_legacy_vpn_ipsec_hybrid_rsa_strongswan(self):
@@ -321,7 +242,9 @@ class LegacyVpnTest(base_test.BaseTestClass):
strongswan server
"""
vpn = VPN_TYPE.IPSEC_HYBRID_RSA
- vpn_profile = self.generate_legacy_vpn_profile(
+ vpn_profile = nutils.generate_legacy_vpn_profile(
+ self.dut, self.vpn_params,
vpn, self.vpn_server_addresses[vpn.name][0],
- self.ipsec_server_type[0])
- self.legacy_vpn_connection_test_logic(vpn_profile)
+ self.ipsec_server_type[0],
+ self.log_path)
+ nutils.legacy_vpn_connection_test_logic(self.dut, vpn_profile)
diff --git a/acts/tests/google/net/NattKeepAliveTest.py b/acts/tests/google/net/NattKeepAliveTest.py
new file mode 100644
index 0000000000..aaf1b1b141
--- /dev/null
+++ b/acts/tests/google/net/NattKeepAliveTest.py
@@ -0,0 +1,151 @@
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts import base_test
+from acts import utils
+from acts.controllers import adb
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.net.net_test_utils import start_tcpdump
+from acts.test_utils.net.net_test_utils import stop_tcpdump
+from acts.test_utils.net import connectivity_test_utils as cutils
+from acts.test_utils.wifi import wifi_test_utils as wutils
+
+import random
+import time
+
+WLAN = "wlan0"
+PKTS = 5
+SERVER_UDP_KEEPALIVE = "python /root/udp_nat_keepalive.py"
+KEEPALIVE_DATA = "ff"
+
+
+class NattKeepAliveTest(base_test.BaseTestClass):
+ """ Tests for NATT keepalive """
+
+ def setup_class(self):
+ """ Setup devices for tests and unpack params """
+
+ self.dut = self.android_devices[0]
+ req_params = ("wifi_network", "remote_server", "server_ssh_config")
+ self.unpack_userparams(req_params)
+
+ wutils.wifi_connect(self.dut, self.wifi_network)
+
+ self.ip_a = self.dut.droid.connectivityGetIPv4Addresses(WLAN)[0]
+ self.ip_b = self.remote_server
+ self.log.info("DUT IP addr: %s" % self.ip_a)
+ self.log.info("Remote server IP addr: %s" % self.ip_b)
+ self.tcpdump_pid = None
+
+ def teardown_class(self):
+ wutils.reset_wifi(self.dut)
+
+ def setup_test(self):
+ self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
+
+ def teardown_test(self):
+ stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+
+ """ Helper functions """
+
+ def _verify_time_interval(self, time_interval, cmd_out):
+ """ Verify time diff between packets is equal to time interval """
+
+ self.log.info("Packet Info: \n%s\n" % cmd_out)
+ pkts = cmd_out.rstrip().decode("utf-8", "ignore").split("\n")
+
+ prev = 0
+ for i in range(len(pkts)):
+ interval, data = pkts[i][1:-1].split(',')
+
+ # verify data
+ if data.lstrip().rstrip()[1:-1] != KEEPALIVE_DATA:
+ self.log.error("Server received invalid data %s" % data)
+ return False
+
+ # verify time interval
+ curr = float(interval)
+ if i == 0:
+ prev = curr
+ continue
+ diff = int(round(curr-prev))
+ if diff != time_interval:
+ self.log.error("Keepalive time interval is %s, expected %s"
+ % (diff, time_interval))
+ return False
+ prev = curr
+
+ return True
+
+ """ Tests begin """
+
+ @test_tracker_info(uuid="c9012da2-656f-44ef-bad6-26892335d4bd")
+ def test_natt_keepalive_ipv4(self):
+ """ Test natt keepalive over wifi
+
+ Steps:
+ 1. Open a UDP port 4500 on linux host
+ 2. Start NATT keepalive packets on DUT and send 5 packets
+ 3. Verify that 5 keepalive packets reached host with data '0xff'
+ """
+
+ # set a time interval
+ result = True
+ time_interval = random.randint(10, 60)
+ port = random.randint(8000, 9000)
+ self.log.info("NATT keepalive time interval is %s" % time_interval)
+ self.log.info("Source port is %s" % port)
+ time_out = time_interval * PKTS + 6
+
+ # start NATT keep alive
+ nka_key = cutils.start_natt_keepalive(
+ self.dut, self.ip_a, port, self.ip_b, time_interval)
+ asserts.assert_true(nka_key, "Failed to start NATT keepalive")
+
+ # capture packets on server
+ self.log.info("Capturing keepalive packets on %s" % self.ip_b)
+ cmd_out = utils.exe_cmd("timeout %s %s" %
+ (time_out, SERVER_UDP_KEEPALIVE))
+
+ # verify packets received
+ result = self._verify_time_interval(time_interval, cmd_out)
+
+ # stop NATT keep alive
+ status = cutils.stop_natt_keepalive(self.dut, nka_key)
+ asserts.assert_true(status, "Failed to stop NATT keepalive")
+
+ return result
+
+ @test_tracker_info(uuid="8ab20733-4a9e-4e4d-a46f-4d32a9f221c5")
+ def test_natt_keepalive_ipv4_invalid_interval(self):
+ """ Test invalid natt keepalive time interval
+
+ Steps:
+ 1. Start NATT keepalive with time interval less than 10 seconds
+ 2. API should return invalid interval
+ """
+
+ # start NATT keep alive
+ port = random.randint(8000, 9000)
+ nka_key = cutils.start_natt_keepalive(
+ self.dut, self.ip_a, port, self.ip_b, 2)
+ asserts.assert_true(not nka_key,
+ "Started NATT keepalive with invalid interval")
+
+ """ Tests end """
diff --git a/acts/tests/google/net/ProxyTest.py b/acts/tests/google/net/ProxyTest.py
new file mode 100644
index 0000000000..66aa345118
--- /dev/null
+++ b/acts/tests/google/net/ProxyTest.py
@@ -0,0 +1,178 @@
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from acts import asserts
+from acts import base_test
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.net.net_test_utils import start_tcpdump
+from acts.test_utils.net.net_test_utils import stop_tcpdump
+from acts.test_utils.wifi import wifi_test_utils as wutils
+
+from scapy.all import IP
+from scapy.all import TCP
+from scapy.all import UDP
+from scapy.all import Raw
+from scapy.all import rdpcap
+from scapy.all import Scapy_Exception
+
+
+class ProxyTest(base_test.BaseTestClass):
+ """ Network proxy tests """
+
+ def setup_class(self):
+ """ Setup devices for tests and unpack params """
+ self.dut = self.android_devices[0]
+ req_params = ("wifi_network_no_dns_tls", "proxy_pac", "proxy_server",
+ "proxy_port", "bypass_host", "non_bypass_host")
+ self.unpack_userparams(req_params)
+ wutils.wifi_connect(self.dut, self.wifi_network_no_dns_tls)
+ self.tcpdump_pid = None
+ self.proxy_port = int(self.proxy_port)
+
+ def teardown_test(self):
+ self.dut.droid.connectivityResetGlobalProxy()
+ global_proxy = self.dut.droid.connectivityGetGlobalProxy()
+ if global_proxy:
+ self.log.error("Failed to reset global proxy settings")
+
+ def teardown_class(self):
+ wutils.reset_wifi(self.dut)
+
+ """ Helper methods """
+
+ def _verify_http_request(self, ad):
+ """ Send http requests to hosts
+
+ Steps:
+ 1. Send http requests to hosts
+ a. Host that is bypassed by proxy server
+ b. Host that goes through proxy server
+ 2. Verify that both return valid responses
+
+ Args:
+ 1. ad: dut to run http requests
+ """
+ for host in [self.bypass_host, self.non_bypass_host]:
+ host = "https://%s" % host
+ result = ad.droid.httpRequestString(host)
+ asserts.assert_true(result, "Http request failed for %s" % host)
+
+ def _verify_proxy_server(self, pcap_file, bypass_host, hostname):
+ """ Verify that http requests are going through proxy server
+
+ Args:
+ 1. tcpdump: pcap file
+ 2. bypass_host: boolean value if the request goes through proxy
+ 3. hostname: hostname requested
+
+ Returns:
+ True/False if the bypass condition met
+ """
+ self.log.info("Checking proxy server for query to: %s" % hostname)
+ try:
+ packets = rdpcap(pcap_file)
+ except Scapy_Exception:
+ asserts.fail("Not a valid pcap file")
+
+ dns_query = False
+ http_query = False
+ for pkt in packets:
+ summary = "%s" % pkt.summary()
+ if UDP in pkt and pkt[UDP].dport == 53 and hostname in summary:
+ dns_query = True
+ break
+ if TCP in pkt and pkt[TCP].dport == self.proxy_port and Raw in pkt\
+ and hostname in str(pkt[Raw]):
+ http_query = True
+
+ self.log.info("Bypass hostname set to: %s" % bypass_host)
+ self.log.info("Found DNS query for host: %s" % dns_query)
+ self.log.info("Found HTTP query for host: %s" % http_query)
+ if bypass_host and http_query and not dns_query or \
+ not bypass_host and not http_query and dns_query:
+ return False
+ return True
+
+ def _test_proxy(self):
+ """ Test pac piroxy and manual proxy settings
+
+ Steps:
+ 1. Start tcpdump
+ 2. Run http requests
+ 3. Stop tcpdump
+ 4. Verify the packets from tcpdump have valid queries
+ """
+
+ # start tcpdump on the device
+ self.tcpdump_pid = start_tcpdump(self.dut, self.test_name)
+
+ # verify http requests
+ self._verify_http_request(self.dut)
+
+ # stop tcpdump on the device
+ pcap_file = stop_tcpdump(self.dut, self.tcpdump_pid, self.test_name)
+
+ # verify proxy server
+ result = self._verify_proxy_server(pcap_file, True, self.bypass_host)
+ asserts.assert_true(result, "Proxy failed for %s" % self.bypass_host)
+ result = self._verify_proxy_server(pcap_file, False, self.non_bypass_host)
+ asserts.assert_true(result, "Proxy failed for %s" % self.non_bypass_host)
+
+ """ Test Cases """
+
+ @test_tracker_info(uuid="16881315-1a50-48ce-bd36-7b0d2f21b734")
+ def test_pac_proxy_over_wifi(self):
+ """ Test proxy with auto config over wifi
+
+ Steps:
+ 1. Connect to a wifi network
+ 2. Set a global proxy with auto config
+ 3. Do a http request on the hostnames
+ 4. Verify that no DNS packets seen for non bypassed hostnames
+ 5. Verify that DNS packets seen for bypassed hostnames
+ """
+ # set global pac proxy
+ self.log.info("Setting global proxy to: %s" % self.proxy_pac)
+ self.dut.droid.connectivitySetGlobalPacProxy(self.proxy_pac)
+ global_proxy = self.dut.droid.connectivityGetGlobalProxy()
+ asserts.assert_true(global_proxy['PacUrl'] == self.proxy_pac,
+ "Failed to set pac proxy")
+
+ # test proxy
+ self._test_proxy()
+
+ @test_tracker_info(uuid="4d3361f6-866d-423c-9ed7-5a6943575fe9")
+ def test_manual_proxy_over_wifi(self):
+ """ Test manual proxy over wifi
+
+ Steps:
+ 1. Connect to a wifi network
+ 2. Set a global manual proxy with proxy server, port & bypass URLs
+ 3. Do a http request on the hostnames
+ 4. Verify that no DNS packets are seen for non bypassed hostnames
+ 5. Verify that DNS packets seen for bypassed hostnames
+ """
+ # set global manual proxy
+ self.log.info("Setting global proxy to: %s %s %s" %
+ (self.proxy_server, self.proxy_port, self.bypass_host))
+ self.dut.droid.connectivitySetGlobalProxy(self.proxy_server,
+ self.proxy_port,
+ self.bypass_host)
+ global_proxy = self.dut.droid.connectivityGetGlobalProxy()
+ asserts.assert_true(global_proxy['Hostname'] == self.proxy_server,
+ "Failed to set manual proxy")
+
+ # test proxy
+ self._test_proxy()
diff --git a/acts/tests/google/wifi/WifiIOTTest.py b/acts/tests/google/wifi/WifiIOTTest.py
index 22e67d9e28..75449517f7 100755
--- a/acts/tests/google/wifi/WifiIOTTest.py
+++ b/acts/tests/google/wifi/WifiIOTTest.py
@@ -132,12 +132,12 @@ class WifiIOTTest(WifiBaseTest):
"""Tests"""
@test_tracker_info(uuid="a57cc861-b6c2-47e4-9db6-7a3ab32c6e20")
- def iot_connection_to_ubiquity_ap1_2g(self):
+ def test_iot_connection_to_ubiquity_ap1_2g(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="2065c2f7-2b89-4da7-a15d-e5dc17b88d52")
- def iot_connection_to_ubiquity_ap1_5g(self):
+ def test_iot_connection_to_ubiquity_ap1_5g(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@@ -152,12 +152,12 @@ class WifiIOTTest(WifiBaseTest):
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="02a8cc75-6781-4153-8d90-bed7568a1e78")
- def iot_connection_to_AirportExtreme_2G(self):
+ def test_iot_connection_to_AirportExtreme_2G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="83a42c97-1358-4ba7-bdb2-238fdb1c945e")
- def iot_connection_to_AirportExtreme_5G(self):
+ def test_iot_connection_to_AirportExtreme_5G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@@ -172,12 +172,12 @@ class WifiIOTTest(WifiBaseTest):
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="2503d9ed-35df-4be0-b838-590324cecaee")
- def test_iot_connection_to_Dlink_AC1200_2G(self):
+ def iot_connection_to_Dlink_AC1200_2G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="0a44e148-a4bf-43f4-88eb-e4c1ffa850ce")
- def test_iot_connection_to_Dlink_AC1200_5G(self):
+ def iot_connection_to_Dlink_AC1200_5G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@@ -242,17 +242,17 @@ class WifiIOTTest(WifiBaseTest):
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="054d2ffc-97fd-4613-bf47-acedd0fa4701")
- def iot_connection_to_NETGEAR_AC3200_2G(self):
+ def test_iot_connection_to_NETGEAR_AC3200_2G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="d15a789a-def5-4c6a-b59e-1a75f73cc6a9")
- def iot_connection_to_NETGEAR_AC3200_5G_1(self):
+ def test_iot_connection_to_NETGEAR_AC3200_5G_1(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="1de6369e-97da-479f-b17c-9144bb814f51")
- def iot_connection_to_NETGEAR_AC3200_5G_2(self):
+ def test_iot_connection_to_NETGEAR_AC3200_5G_2(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@@ -282,12 +282,12 @@ class WifiIOTTest(WifiBaseTest):
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="e5517b82-c225-449d-83ac-055a561a764f")
- def iot_connection_to_TP_LINK_AC1700_2G(self):
+ def test_iot_connection_to_TP_LINK_AC1700_2G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
@test_tracker_info(uuid="9531d3cc-129d-4501-a5e3-d7502120cd8b")
- def iot_connection_to_TP_LINK_AC1700_5G(self):
+ def test_iot_connection_to_TP_LINK_AC1700_5G(self):
ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
diff --git a/acts/tests/google/wifi/WifiIOTTwPkg1Test.py b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py
new file mode 100644
index 0000000000..b30c606b79
--- /dev/null
+++ b/acts/tests/google/wifi/WifiIOTTwPkg1Test.py
@@ -0,0 +1,364 @@
+#!/usr/bin/env python3
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import pprint
+import time
+
+import acts.signals
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+from acts import asserts
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+from acts.controllers import iperf_server as ipf
+
+import json
+import logging
+import math
+import os
+from acts import utils
+import csv
+
+WifiEnums = wutils.WifiEnums
+
+
+class WifiIOTTwPkg1Test(WifiBaseTest):
+ """ Tests for wifi IOT
+
+ Test Bed Requirement:
+ * One Android device
+ * Wi-Fi IOT networks visible to the device
+ """
+
+ def __init__(self, controllers):
+ self.attenuators = None
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ wutils.wifi_test_device_init(self.dut)
+
+ req_params = [ "iot_networks", ]
+ opt_params = [ "open_network",
+ "iperf_server_address","iperf_port_arg",
+ "pdu_address" , "pduon_wait_time","pduon_address"
+ ]
+ self.unpack_userparams(req_param_names=req_params,
+ opt_param_names=opt_params)
+
+ asserts.assert_true(
+ len(self.iot_networks) > 0,
+ "Need at least one iot network with psk.")
+
+ if getattr(self, 'open_network', False):
+ self.iot_networks.append(self.open_network)
+
+ wutils.wifi_toggle_state(self.dut, True)
+ if "iperf_server_address" in self.user_params:
+ self.iperf_server = self.iperf_servers[0]
+
+ # create hashmap for testcase name and SSIDs
+ self.iot_test_prefix = "test_iot_connection_to_"
+ self.ssid_map = {}
+ for network in self.iot_networks:
+ SSID = network['SSID'].replace('-','_')
+ self.ssid_map[SSID] = network
+
+ # create folder for IOT test result
+ self.log_path = os.path.join(logging.log_path, "IOT_results")
+ utils.create_dir(self.log_path)
+
+ Header=("test_name","throughput_TX","throughput_RX")
+ self.csv_write(Header)
+
+ # check pdu_address
+ if "pdu_address" and "pduon_wait_time" in self.user_params:
+ self.pdu_func()
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+ wutils.reset_wifi(self.dut)
+
+ def teardown_class(self):
+ if "iperf_server_address" in self.user_params:
+ self.iperf_server.stop()
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+
+ """Helper Functions"""
+
+ def connect_to_wifi_network(self, network):
+ """Connection logic for open and psk wifi networks.
+
+ Args:
+ params: Dictionary with network info.
+ """
+ SSID = network[WifiEnums.SSID_KEY]
+ self.dut.ed.clear_all_events()
+ wutils.start_wifi_connection_scan(self.dut)
+ scan_results = self.dut.droid.wifiGetScanResults()
+ wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
+ wutils.wifi_connect(self.dut, network, num_of_tries=3)
+
+ def run_iperf_client(self, network):
+ """Run iperf TX throughput after connection.
+
+ Args:
+ params: Dictionary with network info.
+ """
+ if "iperf_server_address" in self.user_params:
+
+ # Add iot_result
+ iot_result = []
+ self.iperf_server.start(tag="TX_server_{}".format(
+ self.current_test_name))
+ wait_time = 5
+ SSID = network[WifiEnums.SSID_KEY]
+ self.log.info("Starting iperf traffic TX through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {} -J {}".format(self.iperf_server.port,self.iperf_port_arg)
+ success, data = self.dut.run_iperf_client(self.iperf_server_address,
+ port_arg)
+ # Parse and log result
+ client_output_path = os.path.join(
+ self.iperf_server.log_path, "IperfDUT,{},TX_client_{}".format(
+ self.iperf_server.port,self.current_test_name))
+ with open(client_output_path, 'w') as out_file:
+ out_file.write("\n".join(data))
+ self.iperf_server.stop()
+
+ iperf_file = self.iperf_server.log_files[-1]
+ try:
+ iperf_result = ipf.IPerfResult(iperf_file)
+ curr_throughput = math.fsum(iperf_result.instantaneous_rates)
+ except:
+ self.log.warning(
+ "ValueError: Cannot get iperf result. Setting to 0")
+ curr_throughput = 0
+ iot_result.append(curr_throughput)
+ self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput))
+
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
+ return iot_result
+
+ def run_iperf_server(self, network):
+ """Run iperf RX throughput after connection.
+
+ Args:
+ params: Dictionary with network info.
+
+ Returns:
+ iot_result: dict containing iot_results
+ """
+ if "iperf_server_address" in self.user_params:
+
+ # Add iot_result
+ iot_result = []
+ self.iperf_server.start(tag="RX_client_{}".format(
+ self.current_test_name))
+ wait_time = 5
+ SSID = network[WifiEnums.SSID_KEY]
+ self.log.info("Starting iperf traffic RX through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {} -J -R {}".format(self.iperf_server.port,self.iperf_port_arg)
+ success, data = self.dut.run_iperf_client(self.iperf_server_address,
+ port_arg)
+ client_output_path = os.path.join(
+ self.iperf_server.log_path, "IperfDUT,{},RX_server_{}".format(
+ self.iperf_server.port,self.current_test_name))
+ with open(client_output_path, 'w') as out_file:
+ out_file.write("\n".join(data))
+ self.iperf_server.stop()
+
+ iperf_file = client_output_path
+ try:
+ iperf_result = ipf.IPerfResult(iperf_file)
+ curr_throughput = math.fsum(iperf_result.instantaneous_rates)
+ except:
+ self.log.warning(
+ "ValueError: Cannot get iperf result. Setting to 0")
+ curr_throughput = 0
+ iot_result.append(curr_throughput)
+ self.log.info("Throughput is {0:.2f} Mbps".format(curr_throughput))
+
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
+ return iot_result
+
+ def iperf_test_func(self,network):
+ """Main function to test iperf TX/RX.
+
+ Args:
+ params: Dictionary with network info
+ """
+ # Initialize
+ iot_result = {}
+
+ # Run RvR and log result
+ iot_result["throughput_TX"] = self.run_iperf_client(network)
+ iot_result["throughput_RX"] = self.run_iperf_server(network)
+ iot_result["test_name"] = self.current_test_name
+
+ # Save output as text file
+ results_file_path = "{}/{}.json".format(self.log_path,
+ self.current_test_name)
+ with open(results_file_path, 'w') as results_file:
+ json.dump(iot_result, results_file, indent=4)
+
+ data=(iot_result["test_name"],iot_result["throughput_TX"][0],
+ iot_result["throughput_RX"][0])
+ self.csv_write(data)
+
+ def csv_write(self,data):
+ with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file:
+ csv_writer = csv.writer(csv_file,delimiter=',')
+ csv_writer.writerow(data)
+ csv_file.close()
+
+ def pdu_func(self):
+ """control Power Distribution Units on local machine.
+
+ Logic steps are
+ 1. Turn off PDU for all port.
+ 2. Turn on PDU for specified port.
+ """
+ out_file_name = "PDU.log"
+ self.full_out_path = os.path.join(self.log_path, out_file_name)
+ cmd = "curl http://snmp:1234@{}/offs.cgi?led=11111111> {}".format(self.pdu_address,
+ self.full_out_path)
+ self.pdu_process = utils.start_standing_subprocess(cmd)
+ wait_time = 10
+ self.log.info("Starting set PDU to OFF")
+ time.sleep(wait_time)
+ self.full_out_path = os.path.join(self.log_path, out_file_name)
+ cmd = "curl http://snmp:1234@{}/ons.cgi?led={}> {}".format(self.pdu_address,
+ self.pduon_address,
+ self.full_out_path)
+ self.pdu_process = utils.start_standing_subprocess(cmd)
+ wait_time = int("{}".format(self.pduon_wait_time))
+ self.log.info("Starting set PDU to ON for port1,"
+ "wait for {}s".format(self.pduon_wait_time))
+ time.sleep(wait_time)
+ self.log.info("PDU setup complete")
+
+ def connect_to_wifi_network_and_run_iperf(self, network):
+ """Connection logic for open and psk wifi networks.
+
+ Logic steps are
+ 1. Connect to the network.
+ 2. Run iperf throghput.
+
+ Args:
+ params: A dictionary with network info.
+ """
+ self.connect_to_wifi_network(network)
+ self.iperf_test_func(network)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="0e4ad6ed-595c-4629-a4c9-c6be9c3c58e0")
+ def test_iot_connection_to_ASUS_RT_AC68U_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="a76d8acc-808e-4a5d-a52b-5ba07d07b810")
+ def test_iot_connection_to_ASUS_RT_AC68U_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="659a3e5e-07eb-4905-9cda-92e959c7b674")
+ def test_iot_connection_to_D_Link_DIR_868L_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="6bcfd736-30fc-48a8-b4fb-723d1d113f3c")
+ def test_iot_connection_to_D_Link_DIR_868L_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="c9da945a-2c4a-44e1-881d-adf307b39b21")
+ def test_iot_connection_to_TP_LINK_WR940N_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="db0d224d-df81-401f-bf35-08ad02e41a71")
+ def test_iot_connection_to_ASUS_RT_N66U_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="845ff1d6-618d-40f3-81c3-6ed3a0751fde")
+ def test_iot_connection_to_ASUS_RT_N66U_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="6908039b-ccc9-4777-a0f1-3494ce642014")
+ def test_iot_connection_to_ASUS_RT_AC54U_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="2647c15f-2aad-47d7-8dee-b2ee1ac4cef6")
+ def test_iot_connection_to_ASUS_RT_AC54U_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="99678f66-ddf1-454d-87e4-e55177ec380d")
+ def test_iot_connection_to_ASUS_RT_N56U_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="4dd75e81-9a8e-44fd-9449-09f5ab8a63c3")
+ def test_iot_connection_to_ASUS_RT_N56U_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="315397ce-50d5-4abf-a11c-1abcaef832d3")
+ def test_iot_connection_to_BELKIN_F9K1002v1_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="05ba464a-b1ef-4ac1-a32f-c919ec4aa1dd")
+ def test_iot_connection_to_CISCO_E1200_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="04912868-4a47-40ce-877e-4e4c89849557")
+ def test_iot_connection_to_TP_LINK_C2_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="53517a21-3802-4185-b8bb-6eaace063a42")
+ def test_iot_connection_to_TP_LINK_C2_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="71c08c1c-415d-4da4-a151-feef43fb6ad8")
+ def test_iot_connection_to_ASUS_RT_AC66U_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="2322c155-07d1-47c9-bd21-2e358e3df6ee")
+ def test_iot_connection_to_ASUS_RT_AC66U_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.connect_to_wifi_network_and_run_iperf(self.ssid_map[ssid_key])
diff --git a/acts/tests/google/wifi/WifiRvrTwTest.py b/acts/tests/google/wifi/WifiRvrTwTest.py
new file mode 100644
index 0000000000..24afa5ea2d
--- /dev/null
+++ b/acts/tests/google/wifi/WifiRvrTwTest.py
@@ -0,0 +1,482 @@
+#!/usr/bin/env python3
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import itertools
+import pprint
+import time
+
+import acts.signals
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+from acts import asserts
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.wifi.WifiBaseTest import WifiBaseTest
+from acts.controllers import iperf_server as ipf
+
+import json
+import logging
+import math
+import os
+from acts import utils
+import csv
+
+import serial
+import sys
+
+
+WifiEnums = wutils.WifiEnums
+
+
+class WifiRvrTWTest(WifiBaseTest):
+ """ Tests for wifi RVR performance
+
+ Test Bed Requirement:
+ * One Android device
+ * Wi-Fi networks visible to the device
+ """
+ TEST_TIMEOUT = 10
+
+ def __init__(self, controllers):
+ self.attenuators = None
+ WifiBaseTest.__init__(self, controllers)
+
+ def setup_class(self):
+ self.dut = self.android_devices[0]
+ wutils.wifi_test_device_init(self.dut)
+
+ req_params = [ "iot_networks","rvr_test_params"]
+ opt_params = [ "angle_params","usb_port"]
+ self.unpack_userparams(req_param_names=req_params,
+ opt_param_names=opt_params)
+
+ asserts.assert_true(
+ len(self.iot_networks) > 0,
+ "Need at least one iot network with psk.")
+
+ wutils.wifi_toggle_state(self.dut, True)
+ if "rvr_test_params" in self.user_params:
+ self.iperf_server = self.iperf_servers[0]
+ self.MaxdB= self.rvr_test_params ["rvr_atten_MaxDB"]
+ self.MindB= self.rvr_test_params ["rvr_atten_MinDB"]
+ self.stepdB= self.rvr_test_params ["rvr_atten_step"]
+
+ if "angle_params" in self.user_params:
+ self.angle = self.angle_params
+
+ if "usb_port" in self.user_params:
+ self.T1=self.readport(self.usb_port["turntable"])
+ self.ATT1=self.readport(self.usb_port["atten1"])
+ self.ATT2=self.readport(self.usb_port["atten2"])
+ self.ATT3=self.readport(self.usb_port["atten3"])
+
+ # create hashmap for testcase name and SSIDs
+ self.iot_test_prefix = "test_iot_connection_to_"
+ self.ssid_map = {}
+ for network in self.iot_networks:
+ SSID = network['SSID'].replace('-','_')
+ self.ssid_map[SSID] = network
+
+ # create folder for rvr test result
+ self.log_path = os.path.join(logging.log_path, "rvr_results")
+ utils.create_dir(self.log_path)
+
+ Header=("test_SSID","Turn table (angle)","Attenuator(dBm)",
+ "TX throughput (Mbps)","RX throughput (Mbps)",
+ "RSSI","Link speed","Frequency")
+ self.csv_write(Header)
+
+ def setup_test(self):
+ self.dut.droid.wakeLockAcquireBright()
+ self.dut.droid.wakeUpNow()
+
+ def teardown_test(self):
+ self.dut.droid.wakeLockRelease()
+ self.dut.droid.goToSleepNow()
+
+ def teardown_class(self):
+ if "rvr_test_params" in self.user_params:
+ self.iperf_server.stop()
+
+ def on_fail(self, test_name, begin_time):
+ self.dut.take_bug_report(test_name, begin_time)
+ self.dut.cat_adb_log(test_name, begin_time)
+
+ """Helper Functions"""
+
+ def csv_write(self,data):
+ """Output .CSV file for test result.
+
+ Args:
+ data: Dict containing attenuation, throughput and other meta data.
+ """
+ with open("{}/Result.csv".format(self.log_path), "a", newline="") as csv_file:
+ csv_writer = csv.writer(csv_file,delimiter=',')
+ csv_writer.writerow(data)
+ csv_file.close()
+
+ def readport(self,com):
+ """Read com port for current test.
+
+ Args:
+ com: Attenuator or turn table com port
+ """
+ port=serial.Serial(com,9600,timeout=1)
+ time.sleep(1)
+ return port
+
+ def getdB(self,port):
+ """Get attenuator dB for current test.
+
+ Args:
+ port: Attenuator com port
+ """
+ port.write('V?;'.encode())
+ dB=port.readline().decode()
+ dB=dB.strip(';')
+ dB=dB[dB.find('V')+1:]
+ return int(dB)
+
+ def setdB(self,port,dB):
+ """Setup attenuator dB for current test.
+
+ Args:
+ port: Attenuator com port
+ dB: Attenuator setup dB
+ """
+ if dB<0:
+ dB=0
+ elif dB>101:
+ dB=101
+ self.log.info("Set dB to "+str(dB))
+ InputdB=str('V')+str(dB)+str(';')
+ port.write(InputdB.encode())
+ time.sleep(0.1)
+
+ def set_Three_Att_dB(self,port1,port2,port3,dB):
+ """Setup 3 attenuator dB for current test.
+
+ Args:
+ port1: Attenuator1 com port
+ port1: Attenuator2 com port
+ port1: Attenuator com port
+ dB: Attenuator setup dB
+ """
+ self.setdB(port1,dB)
+ self.setdB(port2,dB)
+ self.setdB(port3,dB)
+ self.checkdB(port1,dB)
+ self.checkdB(port2,dB)
+ self.checkdB(port3,dB)
+
+ def checkdB(self,port,dB):
+ """Check attenuator dB for current test.
+
+ Args:
+ port: Attenuator com port
+ dB: Attenuator setup dB
+ """
+ retry=0
+ while self.getdB(port)!=dB and retry<10:
+ retry=retry+1
+ self.log.info("Current dB = "+str(self.getdB(port)))
+ self.log.info("Fail to set Attenuator to "+str(dB)+", "
+ +str(retry)+" times try to reset")
+ self.setdB(port,dB)
+ if retry ==10:
+ self.log.info("Retry Attenuator fail for 9 cycles, end test!")
+ sys.exit()
+ return 0
+
+ def getDG(self,port):
+ """Get turn table angle for current test.
+
+ Args:
+ port: Turn table com port
+ """
+ DG = ""
+ port.write('DG?;'.encode())
+ time.sleep(0.1)
+ data = port.readline().decode('utf-8')
+ for i in range(len(data)):
+ if (data[i].isdigit()) == True:
+ DG = DG + data[i]
+ if DG == "":
+ return -1
+ return int(DG)
+
+ def setDG(self,port,DG):
+ """Setup turn table angle for current test.
+
+ Args:
+ port: Turn table com port
+ DG: Turn table setup angle
+ """
+ if DG>359:
+ DG=359
+ elif DG<0:
+ DG=0
+ self.log.info("Set angle to "+str(DG))
+ InputDG=str('DG')+str(DG)+str(';')
+ port.write(InputDG.encode())
+
+ def checkDG(self,port,DG):
+ """Check turn table angle for current test.
+
+ Args:
+ port: Turn table com port
+ DG: Turn table setup angle
+ """
+ retrytime = self.TEST_TIMEOUT
+ retry = 0
+ while self.getDG(port)!=DG and retry<retrytime:
+ retry=retry+1
+ self.log.info('Current angle = '+str(self.getDG(port)))
+ self.log.info('Fail set angle to '+str(DG)+', '+str(retry)+' times try to reset')
+ self.setDG(port,DG)
+ time.sleep(10)
+ if retry == retrytime:
+ self.log.info('Retry turntable fail for '+str(retry)+' cycles, end test!')
+ sys.exit()
+ return 0
+
+ def getwifiinfo(self):
+ """Get WiFi RSSI/ link speed/ frequency for current test.
+
+ Returns:
+ [RSSI,LS,FR]: WiFi RSSI/ link speed/ frequency
+ """
+ def is_number(string):
+ for i in string:
+ if i.isdigit() == False:
+ if (i=="-" or i=="."):
+ continue
+ return str(-1)
+ return string
+
+ try:
+ cmd = "adb shell iw wlan0 link"
+ wifiinfo = utils.subprocess.check_output(cmd,shell=True,
+ timeout=self.TEST_TIMEOUT)
+ # Check RSSI
+ RSSI = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("signal:") +
+ 7:wifiinfo.decode("utf-8").find("dBm") - 1]
+ RSSI = RSSI.strip(' ')
+ RSSI = is_number(RSSI)
+ # Check link speed
+ LS = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("bitrate:") +
+ 8:wifiinfo.decode("utf-8").find("Bit/s") - 2]
+ LS = LS.strip(' ')
+ LS = is_number(LS)
+ # Check frequency
+ FR = wifiinfo.decode("utf-8")[wifiinfo.decode("utf-8").find("freq:") +
+ 6:wifiinfo.decode("utf-8").find("freq:") + 10]
+ FR = FR.strip(' ')
+ FR = is_number(FR)
+ except:
+ return -1, -1, -1
+ return [RSSI,LS,FR]
+
+ def post_process_results(self, rvr_result):
+ """Saves JSON formatted results.
+
+ Args:
+ rvr_result: Dict containing attenuation, throughput and other meta
+ data
+ """
+ # Save output as text file
+ data=(rvr_result["test_name"],rvr_result["test_angle"],rvr_result["test_dB"],
+ rvr_result["throughput_TX"][0],rvr_result["throughput_RX"][0],
+ rvr_result["test_RSSI"],rvr_result["test_LS"],rvr_result["test_FR"])
+ self.csv_write(data)
+
+ results_file_path = "{}/{}_angle{}_{}dB.json".format(self.log_path,
+ self.ssid,
+ self.angle[self.ag],self.DB)
+ with open(results_file_path, 'w') as results_file:
+ json.dump(rvr_result, results_file, indent=4)
+
+ def connect_to_wifi_network(self, network):
+ """Connection logic for psk wifi networks.
+
+ Args:
+ params: Dictionary with network info.
+ """
+ SSID = network[WifiEnums.SSID_KEY]
+ self.dut.ed.clear_all_events()
+ wutils.start_wifi_connection_scan(self.dut)
+ scan_results = self.dut.droid.wifiGetScanResults()
+ wutils.assert_network_in_list({WifiEnums.SSID_KEY: SSID}, scan_results)
+ wutils.wifi_connect(self.dut, network, num_of_tries=3)
+
+ def run_iperf_client(self, network):
+ """Run iperf TX throughput after connection.
+
+ Args:
+ params: Dictionary with network info.
+
+ Returns:
+ rvr_result: Dict containing rvr_results
+ """
+ rvr_result = []
+ self.iperf_server.start(tag="TX_server_{}_angle{}_{}dB".format(
+ self.ssid,self.angle[self.ag],self.DB))
+ wait_time = 5
+ SSID = network[WifiEnums.SSID_KEY]
+ self.log.info("Starting iperf traffic TX through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {} -J {}".format(self.iperf_server.port,
+ self.rvr_test_params["iperf_port_arg"])
+ success, data = self.dut.run_iperf_client(
+ self.rvr_test_params["iperf_server_address"],
+ port_arg,
+ timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
+ # Parse and log result
+ client_output_path = os.path.join(
+ self.iperf_server.log_path, "IperfDUT,{},TX_client_{}_angle{}_{}dB".format(
+ self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
+ with open(client_output_path, 'w') as out_file:
+ out_file.write("\n".join(data))
+ self.iperf_server.stop()
+
+ iperf_file = self.iperf_server.log_files[-1]
+ try:
+ iperf_result = ipf.IPerfResult(iperf_file)
+ curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
+ self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
+ iperf_result.instantaneous_rates[self.rvr_test_params[
+ "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
+ except:
+ self.log.warning(
+ "ValueError: Cannot get iperf result. Setting to 0")
+ curr_throughput = 0
+ rvr_result.append(curr_throughput)
+ self.log.info("TX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
+ self.DB, curr_throughput))
+
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
+ return rvr_result
+
+ def run_iperf_server(self, network):
+ """Run iperf RX throughput after connection.
+
+ Args:
+ params: Dictionary with network info.
+
+ Returns:
+ rvr_result: Dict containing rvr_results
+ """
+ rvr_result = []
+ self.iperf_server.start(tag="RX_client_{}_angle{}_{}dB".format(
+ self.ssid,self.angle[self.ag],self.DB))
+ wait_time = 5
+ SSID = network[WifiEnums.SSID_KEY]
+ self.log.info("Starting iperf traffic RX through {}".format(SSID))
+ time.sleep(wait_time)
+ port_arg = "-p {} -J -R {}".format(self.iperf_server.port,
+ self.rvr_test_params["iperf_port_arg"])
+ success, data = self.dut.run_iperf_client(
+ self.rvr_test_params["iperf_server_address"],
+ port_arg,
+ timeout=self.rvr_test_params["iperf_duration"] + self.TEST_TIMEOUT)
+ # Parse and log result
+ client_output_path = os.path.join(
+ self.iperf_server.log_path, "IperfDUT,{},RX_server_{}_angle{}_{}dB".format(
+ self.iperf_server.port,self.ssid,self.angle[self.ag],self.DB))
+ with open(client_output_path, 'w') as out_file:
+ out_file.write("\n".join(data))
+ self.iperf_server.stop()
+
+ iperf_file = client_output_path
+ try:
+ iperf_result = ipf.IPerfResult(iperf_file)
+ curr_throughput = (math.fsum(iperf_result.instantaneous_rates[
+ self.rvr_test_params["iperf_ignored_interval"]:-1]) / len(
+ iperf_result.instantaneous_rates[self.rvr_test_params[
+ "iperf_ignored_interval"]:-1])) * 8 * (1.024**2)
+ except:
+ self.log.warning(
+ "ValueError: Cannot get iperf result. Setting to 0")
+ curr_throughput = 0
+ rvr_result.append(curr_throughput)
+ self.log.info("RX Throughput at {0:.2f} dB is {1:.2f} Mbps".format(
+ self.DB, curr_throughput))
+
+ self.log.debug(pprint.pformat(data))
+ asserts.assert_true(success, "Error occurred in iPerf traffic.")
+ return rvr_result
+
+ def iperf_test_func(self,network):
+ """Main function to test iperf TX/RX.
+
+ Args:
+ params: Dictionary with network info
+ """
+ if "rvr_test_params" in self.user_params:
+ # Initialize
+ rvr_result = {}
+ # Run RvR and log result
+ wifiinfo = self.getwifiinfo()
+ rvr_result["throughput_TX"] = self.run_iperf_client(network)
+ rvr_result["throughput_RX"] = self.run_iperf_server(network)
+ rvr_result["test_name"] = self.ssid
+ rvr_result["test_angle"] = self.angle[self.ag]
+ rvr_result["test_dB"] = self.DB
+ rvr_result["test_RSSI"] = wifiinfo[0]
+ rvr_result["test_LS"] = wifiinfo[1]
+ rvr_result["test_FR"] = wifiinfo[2]
+ self.post_process_results(rvr_result)
+
+ def rvr_test(self,network):
+ """Test function to run RvR.
+
+ The function runs an RvR test in the current device/AP configuration.
+ Function is called from another wrapper function that sets up the
+ testbed for the RvR test
+
+ Args:
+ params: Dictionary with network info
+ """
+ wait_time = 5
+ utils.subprocess.check_output('adb root', shell=True, timeout=20)
+ self.ssid = network[WifiEnums.SSID_KEY]
+ self.log.info("Start rvr test")
+ for i in range(len(self.angle)):
+ self.setDG(self.T1,self.angle[i])
+ time.sleep(wait_time)
+ self.checkDG(self.T1,self.angle[i])
+ self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,0)
+ time.sleep(wait_time)
+ self.connect_to_wifi_network(network)
+ self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.MindB)
+ for j in range(self.MindB,self.MaxdB+self.stepdB,self.stepdB):
+ self.DB=j
+ self.ag=i
+ self.set_Three_Att_dB(self.ATT1,self.ATT2,self.ATT3,self.DB)
+ self.iperf_test_func(network)
+ wutils.reset_wifi(self.dut)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="93816af8-4c63-45f8-b296-cb49fae0b158")
+ def test_iot_connection_to_RVR_2G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.rvr_test(self.ssid_map[ssid_key])
+
+ @test_tracker_info(uuid="e1a67e13-946f-4d91-aa73-3f945438a1ac")
+ def test_iot_connection_to_RVR_5G(self):
+ ssid_key = self.current_test_name.replace(self.iot_test_prefix, "")
+ self.rvr_test(self.ssid_map[ssid_key]) \ No newline at end of file
diff --git a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
index 6f0beba430..ffe0effb96 100755
--- a/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
+++ b/acts/tests/google/wifi/WifiStaApConcurrencyTest.py
@@ -346,9 +346,9 @@ class WifiStaApConcurrencyTest(WifiBaseTest):
def test_softap_5G_wifi_connection_2G(self):
"""Tests bringing up SoftAp on 5G followed by connection to 2G network.
"""
- self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G)
+ self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G)
self.start_softap_and_connect_to_wifi_network(
- self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_5G)
@test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c")
def test_softap_2G_wifi_connection_5G(self):
@@ -356,7 +356,7 @@ class WifiStaApConcurrencyTest(WifiBaseTest):
"""
self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G)
self.start_softap_and_connect_to_wifi_network(
- self.wpapsk_5g, WIFI_CONFIG_APBAND_5G)
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
@test_tracker_info(uuid="a2c62bc6-9ccd-4bc4-8a23-9a1b5d0b4b5c")
def test_softap_2G_wifi_connection_5G_DFS(self):
@@ -364,14 +364,14 @@ class WifiStaApConcurrencyTest(WifiBaseTest):
"""
self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G_DFS)
self.start_softap_and_connect_to_wifi_network(
- self.wpapsk_5g, WIFI_CONFIG_APBAND_5G)
+ self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
@test_tracker_info(uuid="aa23a3fc-31a1-4d5c-8cf5-2eb9fdf9e7ce")
def test_softap_5G_wifi_connection_2G_with_location_scan_on(self):
"""Tests bringing up SoftAp on 5G followed by connection to 2G network
with location scans turned on.
"""
- self.configure_ap(channel_5g=WIFI_NETWORK_AP_CHANNEL_5G)
+ self.configure_ap(channel_2g=WIFI_NETWORK_AP_CHANNEL_2G)
self.turn_location_on_and_scan_toggle_on()
self.start_softap_and_connect_to_wifi_network(
- self.wpapsk_5g, WIFI_CONFIG_APBAND_2G)
+ self.wpapsk_2g, WIFI_CONFIG_APBAND_5G)
diff --git a/acts/tests/google/wifi/WifiStressTest.py b/acts/tests/google/wifi/WifiStressTest.py
index 78608d7142..e99aed609f 100755
--- a/acts/tests/google/wifi/WifiStressTest.py
+++ b/acts/tests/google/wifi/WifiStressTest.py
@@ -19,7 +19,7 @@ import time
import acts.base_test
import acts.test_utils.wifi.wifi_test_utils as wutils
-import acts.utils
+import acts.test_utils.tel.tel_test_utils as tutils
from acts import asserts
from acts import signals
@@ -134,6 +134,31 @@ class WifiStressTest(WifiBaseTest):
if "100% packet loss" in result:
raise signals.TestFailure("100% packet loss during ping")
+ def start_youtube_video(self, url=None, secs=60):
+ """Start a youtube video and check if it's playing.
+
+ Args:
+ url: The URL of the youtube video to play.
+ secs: Time to play video in seconds.
+
+ """
+ ad = self.dut
+ ad.log.info("Start a youtube video")
+ ad.ensure_screen_on()
+ video_played = False
+ for count in range(2):
+ ad.unlock_screen()
+ ad.adb.shell('am start -a android.intent.action.VIEW -d "%s"' % url)
+ if tutils.wait_for_state(ad.droid.audioIsMusicActive, True, 15, 1):
+ ad.log.info("Started a video in youtube.")
+ # Play video for given seconds.
+ time.sleep(secs)
+ video_played = True
+ break
+ if not video_played:
+ raise signals.TestFailure("Youtube video did not start. Current WiFi "
+ "state is %d" % self.dut.droid.wifiCheckState())
+
"""Tests"""
@test_tracker_info(uuid="cd0016c6-58cf-4361-b551-821c0b8d2554")
@@ -206,17 +231,54 @@ class WifiStressTest(WifiBaseTest):
sec = self.stress_hours * 60 * 60
args = "-p {} -t {} -R".format(self.iperf_server.port, sec)
self.log.info("Running iperf client {}".format(args))
+ start_time = time.time()
result, data = self.dut.run_iperf_client(self.iperf_server_address,
args, timeout=sec+1)
if not result:
self.log.debug("Error occurred in iPerf traffic.")
+ start_time = time.time()
self.run_ping(sec)
except:
+ total_time = time.time() - start_time
raise signals.TestFailure("Network long-connect failed."
- "Look at logs", extras={"Total Hours":"%d" %self.stress_hours,
- "Seconds Run":"UNKNOWN"})
+ "WiFi State = %d" %self.dut.droid.wifiCheckState(),
+ extras={"Total Hours":"%d" %self.stress_hours,
+ "Seconds Run":"%d" %total_time})
+ total_time = time.time() - start_time
+ self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState())
raise signals.TestPass(details="", extras={"Total Hours":"%d" %
- self.stress_hours, "Seconds":"%d" %sec})
+ self.stress_hours, "Seconds Run":"%d" %total_time})
+
+ def test_stress_youtube_5g(self):
+ """Test to connect to network and play various youtube videos.
+
+ Steps:
+ 1. Scan and connect to a network.
+ 2. Loop through and play a list of youtube videos.
+ 3. Verify no WiFi disconnects/data interruption.
+
+ """
+ # List of Youtube 4K videos.
+ videos = ["https://www.youtube.com/watch?v=TKmGU77INaM",
+ "https://www.youtube.com/watch?v=WNCl-69POro",
+ "https://www.youtube.com/watch?v=dVkK36KOcqs",
+ "https://www.youtube.com/watch?v=0wCC3aLXdOw",
+ "https://www.youtube.com/watch?v=rN6nlNC9WQA",
+ "https://www.youtube.com/watch?v=U--7hxRNPvk"]
+ try:
+ self.scan_and_connect_by_ssid(self.wpa_5g)
+ start_time = time.time()
+ for video in videos:
+ self.start_youtube_video(url=video, secs=10*60)
+ except:
+ total_time = time.time() - start_time
+ raise signals.TestFailure("The youtube stress test has failed."
+ "WiFi State = %d" %self.dut.droid.wifiCheckState(),
+ extras={"Total Hours":"1", "Seconds Run":"%d" %total_time})
+ total_time = time.time() - start_time
+ self.log.debug("WiFi state = %d" %self.dut.droid.wifiCheckState())
+ raise signals.TestPass(details="", extras={"Total Hours":"1",
+ "Seconds Run":"%d" %total_time})
@test_tracker_info(uuid="d367c83e-5b00-4028-9ed8-f7b875997d13")
def test_stress_wifi_failover(self):
@@ -232,6 +294,7 @@ class WifiStressTest(WifiBaseTest):
"""
for count in range(int(self.stress_count/4)):
+ wutils.reset_wifi(self.dut)
ssids = list()
for network in self.networks:
ssids.append(network[WifiEnums.SSID_KEY])
@@ -294,7 +357,8 @@ class WifiStressTest(WifiBaseTest):
wutils.stop_wifi_tethering(self.dut)
asserts.assert_false(self.dut.droid.wifiIsApEnabled(),
"SoftAp failed to shutdown!")
- time.sleep(TIMEOUT)
+ # Give some time for WiFi to come back to previous state.
+ time.sleep(2)
cur_wifi_state = self.dut.droid.wifiCheckState()
if initial_wifi_state != cur_wifi_state:
raise signals.TestFailure("Wifi state was %d before softAP and %d now!" %
diff --git a/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py
new file mode 100755
index 0000000000..a603e017fa
--- /dev/null
+++ b/acts/tests/google/wifi/WifiTethering2GOpenOTATest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import acts.signals as signals
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G
+
+import acts.test_utils.net.net_test_utils as nutils
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+
+class WifiTethering2GOpenOTATest(BaseTestClass):
+ """Wifi Tethering 2G Open OTA tests"""
+
+ def setup_class(self):
+
+ super(WifiTethering2GOpenOTATest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+
+ self.hotspot_device = self.android_devices[0]
+ self.tethered_device = self.android_devices[1]
+ req_params = ("wifi_hotspot_open", )
+ self.unpack_userparams(req_params)
+
+ # verify hotspot device has lte data and supports tethering
+ nutils.verify_lte_data_and_tethering_supported(self.hotspot_device)
+
+ # Save a wifi soft ap configuration and verify that it works
+ wutils.save_wifi_soft_ap_config(self.hotspot_device,
+ self.wifi_hotspot_open,
+ WIFI_CONFIG_APBAND_2G)
+ self._verify_wifi_tethering()
+
+ # Run OTA below, if ota fails then abort all tests.
+ try:
+ ota_updater.update(self.hotspot_device)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
+
+ def teardown_class(self):
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Helper Functions"""
+
+ def _verify_wifi_tethering(self):
+ """Verify wifi tethering"""
+ wutils.start_wifi_tethering_saved_config(self.hotspot_device)
+ wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open)
+ # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="fe502bc3-f9c6-4bed-98ad-acaa7e166521")
+ def test_wifi_tethering_ota_2g_open(self):
+ """ Verify wifi hotspot after ota upgrade
+
+ Steps:
+ 1. Save a wifi hotspot config with 2g band and open auth
+ 2. Do a OTA update
+ 3. Verify that wifi hotspot works with teh saved config
+ """
+ self._verify_wifi_tethering()
diff --git a/acts/tests/google/wifi/WifiTethering2GPskOTATest.py b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py
new file mode 100755
index 0000000000..e9fedcd24c
--- /dev/null
+++ b/acts/tests/google/wifi/WifiTethering2GPskOTATest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import acts.signals as signals
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G
+
+import acts.test_utils.net.net_test_utils as nutils
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+
+class WifiTethering2GPskOTATest(BaseTestClass):
+ """Wifi Tethering 2G Psk OTA tests"""
+
+ def setup_class(self):
+
+ super(WifiTethering2GPskOTATest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+
+ self.hotspot_device = self.android_devices[0]
+ self.tethered_device = self.android_devices[1]
+ req_params = ("wifi_hotspot_psk", )
+ self.unpack_userparams(req_params)
+
+ # verify hotspot device has lte data and supports tethering
+ nutils.verify_lte_data_and_tethering_supported(self.hotspot_device)
+
+ # Save a wifi soft ap configuration and verify that it works
+ wutils.save_wifi_soft_ap_config(self.hotspot_device,
+ self.wifi_hotspot_psk,
+ WIFI_CONFIG_APBAND_2G)
+ self._verify_wifi_tethering()
+
+ # Run OTA below, if ota fails then abort all tests.
+ try:
+ ota_updater.update(self.hotspot_device)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
+
+ def teardown_class(self):
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Helper Functions"""
+
+ def _verify_wifi_tethering(self):
+ """Verify wifi tethering"""
+ wutils.start_wifi_tethering_saved_config(self.hotspot_device)
+ wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk)
+ # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="4b1cec63-d1d2-4046-84e9-e806bb08ce41")
+ def test_wifi_tethering_ota_2g_psk(self):
+ """ Verify wifi hotspot after ota upgrade
+
+ Steps:
+ 1. Save a wifi hotspot config with 2g band and psk auth
+ 2. Do a OTA update
+ 3. Verify that wifi hotspot works with teh saved config
+ """
+ self._verify_wifi_tethering()
diff --git a/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py
new file mode 100755
index 0000000000..6648d0e461
--- /dev/null
+++ b/acts/tests/google/wifi/WifiTethering5GOpenOTATest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import acts.signals as signals
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
+
+import acts.test_utils.net.net_test_utils as nutils
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+
+class WifiTethering5GOpenOTATest(BaseTestClass):
+ """Wifi Tethering 5G Open OTA tests"""
+
+ def setup_class(self):
+
+ super(WifiTethering5GOpenOTATest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+
+ self.hotspot_device = self.android_devices[0]
+ self.tethered_device = self.android_devices[1]
+ req_params = ("wifi_hotspot_open", )
+ self.unpack_userparams(req_params)
+
+ # verify hotspot device has lte data and supports tethering
+ nutils.verify_lte_data_and_tethering_supported(self.hotspot_device)
+
+ # Save a wifi soft ap configuration and verify that it works
+ wutils.save_wifi_soft_ap_config(self.hotspot_device,
+ self.wifi_hotspot_open,
+ WIFI_CONFIG_APBAND_5G)
+ self._verify_wifi_tethering()
+
+ # Run OTA below, if ota fails then abort all tests.
+ try:
+ ota_updater.update(self.hotspot_device)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
+
+ def teardown_class(self):
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Helper Functions"""
+
+ def _verify_wifi_tethering(self):
+ """Verify wifi tethering"""
+ wutils.start_wifi_tethering_saved_config(self.hotspot_device)
+ wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_open)
+ # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="b1a94c8f-f3ed-4755-be4a-764e205b4483")
+ def test_wifi_tethering_ota_5g_open(self):
+ """ Verify wifi hotspot after ota upgrade
+
+ Steps:
+ 1. Save a wifi hotspot config with 5g band and open auth
+ 2. Do a OTA update
+ 3. Verify that wifi hotspot works with teh saved config
+ """
+ self._verify_wifi_tethering()
diff --git a/acts/tests/google/wifi/WifiTethering5GPskOTATest.py b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py
new file mode 100755
index 0000000000..74505787ff
--- /dev/null
+++ b/acts/tests/google/wifi/WifiTethering5GPskOTATest.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import acts.signals as signals
+
+from acts import asserts
+from acts.base_test import BaseTestClass
+from acts.libs.ota import ota_updater
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
+
+import acts.test_utils.net.net_test_utils as nutils
+import acts.test_utils.wifi.wifi_test_utils as wutils
+
+
+class WifiTethering5GPskOTATest(BaseTestClass):
+ """Wifi Tethering 5G Psk OTA tests"""
+
+ def setup_class(self):
+
+ super(WifiTethering5GPskOTATest, self).setup_class()
+ ota_updater.initialize(self.user_params, self.android_devices)
+
+ self.hotspot_device = self.android_devices[0]
+ self.tethered_device = self.android_devices[1]
+ req_params = ("wifi_hotspot_psk", )
+ self.unpack_userparams(req_params)
+
+ # verify hotspot device has lte data and supports tethering
+ nutils.verify_lte_data_and_tethering_supported(self.hotspot_device)
+
+ # Save a wifi soft ap configuration and verify that it works
+ wutils.save_wifi_soft_ap_config(self.hotspot_device,
+ self.wifi_hotspot_psk,
+ WIFI_CONFIG_APBAND_5G)
+ self._verify_wifi_tethering()
+
+ # Run OTA below, if ota fails then abort all tests.
+ try:
+ ota_updater.update(self.hotspot_device)
+ except Exception as err:
+ raise signals.TestSkipClass(
+ "Failed up apply OTA update. Aborting tests")
+
+ def on_fail(self, test_name, begin_time):
+ for ad in self.android_devices:
+ ad.take_bug_report(test_name, begin_time)
+
+ def teardown_class(self):
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Helper Functions"""
+
+ def _verify_wifi_tethering(self):
+ """Verify wifi tethering"""
+ wutils.start_wifi_tethering_saved_config(self.hotspot_device)
+ wutils.wifi_connect(self.tethered_device, self.wifi_hotspot_psk)
+ # (TODO: @gmoturu) Change to stop_wifi_tethering. See b/109876061
+ wutils.wifi_toggle_state(self.hotspot_device, True)
+
+ """Tests"""
+
+ @test_tracker_info(uuid="111d3a33-3152-4993-b1ba-307daaf2a6ff")
+ def test_wifi_tethering_ota_5g_psk(self):
+ """ Verify wifi hotspot after ota upgrade
+
+ Steps:
+ 1. Save a wifi hotspot config with 5g band and psk auth
+ 2. Do a OTA update
+ 3. Verify that wifi hotspot works with teh saved config
+ """
+ self._verify_wifi_tethering()
diff --git a/acts/tests/google/wifi/WifiTetheringTest.py b/acts/tests/google/wifi/WifiTetheringTest.py
index 080f9e47c4..26be9b740c 100644
--- a/acts/tests/google/wifi/WifiTetheringTest.py
+++ b/acts/tests/google/wifi/WifiTetheringTest.py
@@ -31,6 +31,7 @@ from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G
from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
from acts.test_utils.net import socket_test_utils as sutils
from acts.test_utils.net import arduino_test_utils as dutils
+from acts.test_utils.net import net_test_utils as nutils
from acts.test_utils.wifi import wifi_test_utils as wutils
WAIT_TIME = 2
@@ -47,29 +48,19 @@ class WifiTetheringTest(base_test.BaseTestClass):
self.unpack_userparams(req_params)
self.new_ssid = "wifi_tethering_test2"
- wutils.wifi_toggle_state(self.hotspot_device, False)
- self.hotspot_device.droid.telephonyToggleDataConnection(True)
- wait_for_cell_data_connection(self.log, self.hotspot_device, True)
- asserts.assert_true(
- verify_http_connection(self.log, self.hotspot_device),
- "HTTP verification failed on cell data connection")
- asserts.assert_true(
- self.hotspot_device.droid.connectivityIsTetheringSupported(),
- "Tethering is not supported for the provider")
+ nutils.verify_lte_data_and_tethering_supported(self.hotspot_device)
for ad in self.tethered_devices:
- ad.droid.telephonyToggleDataConnection(False)
wutils.wifi_test_device_init(ad)
def teardown_class(self):
""" Reset devices """
- wutils.wifi_toggle_state(self.hotspot_device, True)
for ad in self.tethered_devices:
- ad.droid.telephonyToggleDataConnection(True)
+ wutils.reset_wifi(ad)
def on_fail(self, test_name, begin_time):
""" Collect bug report on failure """
- self.hotspot_device.take_bug_report(test_name, begin_time)
- for ad in self.tethered_devices:
+ return
+ for ad in self.android_devices:
ad.take_bug_report(test_name, begin_time)
""" Helper functions """
@@ -200,7 +191,7 @@ class WifiTetheringTest(base_test.BaseTestClass):
""" Connect disconnect tethered devices to wifi hotspot """
num_android_devices = len(self.tethered_devices)
num_wifi_dongles = 0
- if self.arduino_wifi_dongles:
+ if hasattr(self, 'arduino_wifi_dongles'):
num_wifi_dongles = len(self.arduino_wifi_dongles)
total_devices = num_android_devices + num_wifi_dongles
device_connected = [False] * total_devices
@@ -389,6 +380,8 @@ class WifiTetheringTest(base_test.BaseTestClass):
2. Connect 2 tethered devices to the hotspot device
3. Ping interfaces between the tethered devices
"""
+ asserts.skip_if(not hasattr(self, 'arduino_wifi_dongles'),
+ "No wifi dongles connected. Skipping test")
wutils.toggle_wifi_off_and_on(self.hotspot_device)
self._start_wifi_tethering(WIFI_CONFIG_APBAND_2G)
self._test_traffic_between_two_tethered_devices(self.tethered_devices[0],
diff --git a/acts/tests/google/wifi/WifiThroughputStabilityTest.py b/acts/tests/google/wifi/WifiThroughputStabilityTest.py
index c28bd63d2d..326e8e807b 100644
--- a/acts/tests/google/wifi/WifiThroughputStabilityTest.py
+++ b/acts/tests/google/wifi/WifiThroughputStabilityTest.py
@@ -29,6 +29,8 @@ from acts.test_utils.wifi import wifi_retail_ap as retail_ap
from acts.test_utils.wifi import wifi_test_utils as wutils
TEST_TIMEOUT = 10
+SHORT_SLEEP = 1
+MED_SLEEP = 6
class WifiThroughputStabilityTest(base_test.BaseTestClass):
@@ -97,7 +99,7 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass):
"throughput_stability_test_params", "testbed_params",
"main_network"
]
- opt_params = ["RetailAccessPoints"]
+ opt_params = ["RetailAccessPoints", "golden_files_list"]
self.unpack_userparams(req_params, opt_params)
self.test_params = self.throughput_stability_test_params
self.num_atten = self.attenuators[0].instrument.num_atten
@@ -229,6 +231,14 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass):
test_result = {}
# Configure AP
band = self.access_point.band_lookup_by_channel(channel)
+ if "2G" in band:
+ frequency = wutils.WifiEnums.channel_2G_to_freq[channel]
+ else:
+ frequency = wutils.WifiEnums.channel_5G_to_freq[channel]
+ if frequency in wutils.WifiEnums.DFS_5G_FREQUENCIES:
+ self.access_point.set_region(self.testbed_params["DFS_region"])
+ else:
+ self.access_point.set_region(self.testbed_params["default_region"])
self.access_point.set_channel(band, channel)
self.access_point.set_bandwidth(band, mode)
self.log.info("Access Point Configuration: {}".format(
@@ -240,10 +250,11 @@ class WifiThroughputStabilityTest(base_test.BaseTestClass):
for i in range(self.num_atten)
]
# Connect DUT to Network
- self.main_network[band]["channel"] = channel
+ wutils.wifi_toggle_state(self.dut, True)
wutils.reset_wifi(self.dut)
+ self.main_network[band]["channel"] = channel
wutils.wifi_connect(self.dut, self.main_network[band], num_of_tries=5)
- time.sleep(5)
+ time.sleep(MED_SLEEP)
# Run test and log result
# Start iperf session
self.log.info("Starting iperf test.")