diff options
Diffstat (limited to 'acts_tests')
3 files changed, 686 insertions, 10 deletions
diff --git a/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py b/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py new file mode 100755 index 0000000000..e0a7962788 --- /dev/null +++ b/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py @@ -0,0 +1,554 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +""" +Created on Thu Jul 16 22:58:03 2020 + +@author: qijiang +""" + +import os +import pyvisa +import time +import acts.test_utils.coex.audio_test_utils as atu +import acts.test_utils.bt.bt_test_utils as btutils +import pandas as pd +from acts import asserts +from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory +from acts.test_utils.bt.A2dpBaseTest import A2dpBaseTest +from acts.test_utils.power.PowerBTBaseTest import ramp_attenuation + +PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' + + +class RPIAxis(object): + def __init__(self, VisaConnectString): + """Constructor. + Create a Visa connection + + """ + rm = pyvisa.ResourceManager() + self.instrument = rm.open_resource(VisaConnectString) + self.instrument.read_termination = "\n" # make sure we look for newline at the end of strings we read + + def __getattr__(self, attr): + return getattr(self.instrument, attr) # Delegate all other attrs + + +class RPIAxis_card(RPIAxis): + """ RPIAxis_card() + Create an axis + + """ + def __init__(self, axis_object): + # create an object to communicate to an RPI2 (remote positioner instrument) axis + self.axis = axis_object # store pyvisa instrument connection + + def __getattr__(self, attr): + return getattr(self.axis, attr) # Delegate all other attrs + + def moveTo(self, where): + """ moveTo + move to a given position and make sure you arrived at the target. + """ + # max travale time in seconds. adjust this if you have a really slow positioner! + MAXTRAVELTIME = 150 + t0 = time.time() + self.axis.write("SK %d\n" % where) + done = False + while (not done): + if (time.time() - t0) > MAXTRAVELTIME: + print("looks like we are stuck!\n") + return False + response = self.axis.query("*opc?\n") + if (response == '1'): + return True + else: + response = self.axis.query("CP?\n") + + # stop the positioner + def Stop(self): + t0 = time.time() + done = False + self.axis.write("ST\n") + while (not done): + if (time.time() - t0) > 2: + print("Runaway positioner!\n") + return False + response = self.axis.query("*opc?\n") + if (response == '1'): + return True + + # set continuous rotation mode + def SetContinuousRotationMode(self): + self.axis.write("CR\n") + + # set non continuous rotation mode + def SetNonContinuousRotationMode(self): + self.axis.write("NCR\n") + + +class BtA2dpOtaRangeTest(A2dpBaseTest): + def setup_class(self): + + #'audio_params' is a dict, contains the audio device type, audio streaming + #settings such as volume, duration, audio recording parameters such as + #channel, sampling rate/width, and thdn parameters for audio processing + req_params = [ + 'audio_params', 'positioner', 'dut_config', 'attenuation_vector' + ] + opt_params = ['music_files'] + self.unpack_userparams(req_params) + if len(self.android_devices) > 1: + self.dut = self.android_devices[1] + self.unpack_userparams(opt_params) + music_src = self.music_files[0] + music_dest = PHONE_MUSIC_FILE_DIRECTORY + success = self.dut.push_system_file(music_src, music_dest) + if success: + self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, + os.path.basename(music_src)) + # Initialize media_control class + self.media = btutils.MediaControlOverSl4a(self.dut, + self.music_file) + # Set attenuator to minimum attenuation + self.attenuator = self.attenuators[0] + self.attenuator.set_atten(self.attenuation_vector['min']) + # Create the BTOE(Bluetooth-Other-End) device object + bt_devices = self.user_params.get('bt_devices', []) + attr, idx = bt_devices.split(':') + self.bt_device_controller = getattr(self, attr)[int(idx)] + self.bt_device = bt_factory().generate(self.bt_device_controller) + btutils.enable_bqr(self.bt_device_controller) + + #Setup positioner + self.PhiAxisAddress = "TCPIP0::{}::{}::SOCKET".format( + self.positioner["server_ip"], self.positioner["phi_axis_port"]) + self.ThetaAxisAddress = "TCPIP0::{}::{}::SOCKET".format( + self.positioner["server_ip"], self.positioner["theta_axis_port"]) + self.phi_axis = RPIAxis(self.PhiAxisAddress) + self.phi_card = RPIAxis_card(self.phi_axis) + self.log.info("*IDN? response: {}".format( + self.phi_card.query("*idn?\n"))) + self.theta_axis = RPIAxis(self.ThetaAxisAddress) + self.theta_card = RPIAxis_card(self.theta_axis) + self.log.info("*IDN? response: {}".format( + self.theta_card.query("*idn?\n"))) + self.phi_card.Stop() + self.theta_card.Stop() + + def teardown_class(self): + + if hasattr(self, 'media'): + self.media.stop() + if hasattr(self, 'attenuator'): + self.attenuator.set_atten(self.attenuation_vector['min']) + if hasattr(self, 'dut'): + self.dut.droid.bluetoothFactoryReset() + btutils.disable_bluetooth(self.dut.droid) + self.bt_device.reset() + self.bt_device.power_off() + self.phi_card.moveTo(0) + self.theta_card.moveTo(0) + + def setup_test(self): + + # Reset headset + self.bt_device.reset() + # Initialize audio capture devices + self.audio_device = atu.get_audio_capture_device( + self.bt_device_controller, self.audio_params) + # Connect BT link + connected = self.establish_bt_connection() + asserts.assert_true(connected, 'BT connection failed') + # Output file + file_name = 'OTA_Range_Over_Angle_{}_{}.csv'.format( + self.dut_config['model'], self.dut_config['screen_placement']) + self.file_output = os.path.join(self.log_path, file_name) + + def teardown_test(self): + + if hasattr(self, 'media'): + self.media.stop() + if hasattr(self, 'attenuator'): + self.attenuator.set_atten(self.attenuation_vector['min']) + if hasattr(self, 'dut'): + self.dut.droid.bluetoothFactoryReset() + btutils.disable_bluetooth(self.dut.droid) + self.bt_device.reset() + self.bt_device.power_off() + self.phi_card.moveTo(0) + self.theta_card.moveTo(0) + + def a2dp_play(self): + + if hasattr(self, 'dut'): + vol = self.dut.droid.getMaxMediaVolume( + ) * self.audio_params['volume'] + self.dut.droid.setMediaVolume(vol) + self.media.play() + else: + vol = self.bt_device_controller.droid.getMaxMediaVolume( + ) * self.audio_params['volume'] + self.bt_device_controller.droid.setMediaVolume(vol) + self.bt_device.previous_track() + self.bt_device.play() + + def a2dp_stop(self): + + if hasattr(self, 'dut'): + self.media.stop() + else: + self.bt_device.pause() + + def establish_bt_connection(self): + + if hasattr(self, 'dut'): + self.dut.droid.bluetoothFactoryReset() + self.bt_device.reset() + self.bt_device.power_on() + btutils.enable_bluetooth(self.dut.droid, self.dut.ed) + connected = btutils.connect_phone_to_headset( + self.dut, self.bt_device, 60) + vol = self.dut.droid.getMaxMediaVolume( + ) * self.audio_params['volume'] + self.dut.droid.setMediaVolume(0) + time.sleep(1) + self.dut.droid.setMediaVolume(int(vol)) + self.media.play() + return connected + + elif len(self.bt_device_controller.droid. + bluetoothA2dpSinkGetConnectedDevices()) == 0: + self.log.warning('Need manual intervention to connect BT link') + os.system( + 'spd-say "Please manually connect BT and start playback"') + input('Once fixed, please press ENTER to resume the test') + return 1 + + def run_thdn_analysis(self, audio_captured): + """Calculate Total Harmonic Distortion plus Noise for latest recording. + + Args: + audio_captured: the captured audio file + Returns: + thdn: thdn value in a list + """ + # Calculate Total Harmonic Distortion + Noise + audio_result = atu.AudioCaptureResult(audio_captured, + self.audio_params) + thdn = audio_result.THDN(**self.audio_params['thdn_params']) + return thdn + + def record_audio_and_analyze_thdn(self): + + self.a2dp_play() + time.sleep(1) + self.audio_device.start() + time.sleep(self.audio_params['duration']) + audio_captured = self.audio_device.stop() + audio_result = atu.AudioCaptureResult(audio_captured, + self.audio_params) + thdn = audio_result.THDN(**self.audio_params['thdn_params']) + self.log.info('THDN is {}'.format(thdn[0])) + + self.a2dp_stop() + + return thdn[0] + + def recover_bt_link(self): + """Recover BT link during test. + + Recover BT link from the a2dp sink device + + Returns: + connected: signal whether bt link is restored + """ + #Try to connect from the sink device + if len(self.bt_device_controller.droid.bluetoothGetConnectedDevices() + ) == 0: + self.log.warning('Try to recover BT link') + self.attenuator.set_atten(self.attenuation_vector['min']) + + if hasattr(self, 'dut'): + connected = self.establish_bt_connection() + return connected + else: + device_bonded = self.bt_device_controller.droid.bluetoothGetBondedDevices( + )[0]['address'] + trial_count = 0 + trial_limit = 3 + self.log.info('Try to reconnect from the sink device') + while trial_count < trial_limit: + #Connect master device from the sink device + time_start = time.time() + while time.time() < time_start + 5: + try: + self.bt_device_controller.droid.bluetoothConnectBonded( + device_bonded) + break + except: + pass + time.sleep(2) + if len(self.bt_device_controller.droid. + bluetoothA2dpSinkGetConnectedDevices()) > 0: + vol = self.bt_device_controller.droid.getMaxMediaVolume( + ) * self.audio_params['volume'] + self.bt_device_controller.droid.setMediaVolume(0) + time.sleep(1) + self.bt_device_controller.droid.setMediaVolume( + int(vol)) + return 1 + trial_count += 1 + #Automated reconnect from sink device doesn't work, start fresh connection + if trial_count >= trial_limit: + self.log.info( + 'Need manual intervention on the master device side') + connected = self.establish_bt_connection() + return connected + else: + return 1 + + def find_bt_max_range_bisection_search(self): + + #First linear search to narrow the bisection search + atten_min = self.attenuation_vector['min'] + atten_max = self.attenuation_vector['max'] + atten_step = self.attenuation_vector['step_bisection'] + #Start from initial attenuation + atten_left = atten_min + atten_right = atten_min + while atten_left == atten_right and atten_left < atten_max: + atten_now = self.attenuator.get_atten() + connected = self.recover_bt_link() + if connected == 0: + self.log.warning("Skip this angle as BT connection failed") + max_range = atten_max + return max_range + else: + self.log.info('Connection restored') + ramp_attenuation(self.attenuator, atten_now) + self.log.info("Attenuation set to {}".format(atten_now)) + time.sleep(2) + + thdn = self.record_audio_and_analyze_thdn() + if thdn > self.audio_params['thdn_threshold'] or thdn == 0: + #Hit the right limit for bisection search + if atten_right == atten_min: + self.log.warning('Link breaks at the minimum attenuation') + max_range = atten_min + return max_range + else: + atten_right = atten_now + self.log.info( + 'Right limit found at {} dB'.format(atten_right)) + else: + atten_left = atten_now + atten_right = atten_left + atten_next = min(atten_now + atten_step, atten_max) + ramp_attenuation(self.attenuator, atten_next) + if atten_left == atten_right: + self.log.warning('Could not reach max range') + max_range = atten_max + return max_range + + #Start the bisection search + self.log.info('Start bisection search between {} dB and {} dB'.format( + atten_left, atten_right)) + while atten_right - atten_left > 1: + connected = self.recover_bt_link() + if connected == 0: + self.log.warning("Skip this angle as BT connection failed") + max_range = atten_max + return max_range + else: + self.log.info('Connection restored') + + atten_mid = round((atten_left + atten_right) / 2) + ramp_attenuation(self.attenuator, atten_mid) + atten_now = self.attenuator.get_atten() + self.log.info("Attenuation set to {}".format(atten_now)) + time.sleep(5) + thdn = self.record_audio_and_analyze_thdn() + if thdn > self.audio_params['thdn_threshold'] or thdn == 0: + atten_right = atten_mid + max_range = atten_right - 1 + else: + atten_left = atten_mid + max_range = atten_left + self.log.info('Max range reached at {} dB'.format(max_range)) + return max_range + + def find_bt_max_range_linear_fine_search(self): + + thdn = 0.03 + atten_now = self.attenuator.get_atten() + + while thdn < self.audio_params[ + 'thdn_threshold'] and thdn != 0 and atten_now < self.attenuation_vector[ + 'max']: + atten_now = self.attenuator.get_atten() + self.log.info("Attenuation set to {}".format(atten_now)) + thdn = self.record_audio_and_analyze_thdn() + self.log.info("THDN is {}".format(thdn)) + self.attenuator.set_atten(atten_now + + self.attenuation_vector['step_fine']) + max_range = self.attenuator.get_atten( + ) - self.attenuation_vector['step_fine'] * 2 + if thdn == 0: + self.log.warning( + "Music play stopped, link might get lost, max range reached at {} dB" + .format(max_range)) + else: + self.log.info("Max range reached at {}".format(max_range)) + if atten_now == self.attenuation_vector['max']: + self.log.warning("Fail to reach max range") + return max_range + + def test_bisection_search_max(self): + + #Find the BT max range under each angle using bisection search + max_range_all = [] + + for phi in self.positioner['phi_range']: + + succeed = self.phi_card.moveTo(phi) + if succeed: + self.log.info("Phi positioner moved to {} degree".format(phi)) + else: + self.log.warning( + "Fail to move phi positioner to {} degree".format(phi)) + self.log.info("Phi positioner moved to {} degree".format(phi)) + max_ranges = [phi] + + for theta in self.positioner['theta_range']: + + succeed = self.theta_card.moveTo(theta) + if succeed: + self.log.info( + "Theta positioner moved to {} degree".format(theta)) + else: + self.log.warning( + "Failed to move theta positioner to {} degree".format( + theta)) + self.log.info( + "Theta positioner moved to {} degree".format(theta)) + + ramp_attenuation(self.attenuator, + self.attenuation_vector['min']) + time.sleep(2) + max_range = self.find_bt_max_range_bisection_search() + max_ranges.append(max_range) + max_range_all.append(max_ranges) + columns = ['Phi/Theta'] + columns.extend(self.positioner['theta_range']) + df = pd.DataFrame(max_range_all, columns=columns) + df.to_csv(self.file_output, index=False) + + def test_coarse_search(self): + + #Coarse search to find the highest minimum attenuation can be set to + #be a starting point for all angles + thdn = 0.03 + max_atten_reached = 0 + ramp_attenuation(self.attenuator, + self.attenuation_vector['start_coarse']) + self.log.info('Start attenuation at {} dB'.format( + self.attenuator.get_atten())) + while True: + atten_now = self.attenuator.get_atten() + if atten_now == self.attenuation_vector['max']: + if max_atten_reached > 1: + self.log.warning( + 'Can not reach to the highest minimum, attenuator is already set to be max, need to add more attenuation' + ) + break + for phi in self.positioner['phi_range']: + if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: + break + succeed = self.phi_card.moveTo(phi) + if succeed: + self.log.info( + "Phi positioner moved to {} degree".format(phi)) + else: + self.log.warning( + "Fail to move phi positioner to {} degree".format(phi)) + self.log.info("Phi positioner moved to {} degree".format(phi)) + + for theta in self.positioner['theta_range']: + + succeed = self.theta_card.moveTo(theta) + if succeed: + self.log.info( + "Theta positioner moved to {} degree".format( + theta)) + else: + self.log.warning( + "Failed to move theta positioner to {} degree". + format(theta)) + self.log.info( + "Theta positioner moved to {} degree".format(theta)) + + thdn = self.record_audio_and_analyze_thdn() + self.log.info( + 'THDN at thea {} degree, phi {} degree is {}'.format( + theta, phi, thdn)) + if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: + break + if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]: + highest_max = self.attenuator.get_atten( + ) - self.attenuation_vector['step_coarse'] + self.log.info( + 'Highest minimum attenuation is {} dB, fine search can start from there' + .format(highest_max)) + break + atten_new = min(atten_now + self.attenuation_vector['step_coarse'], + self.attenuation_vector['max']) + if atten_new == self.attenuation_vector['max']: + max_atten_reached += 1 + self.attenuator.set_atten(atten_new) + self.log.info('\nSetting attenuator to {} dB'.format( + self.attenuator.get_atten())) + + def test_finestep_search_max(self): + + #Find the BT max range under each angle with a finer step search + max_range_all = [] + for phi in self.positioner['phi_range']: + + succeed = self.phi_card.moveTo(phi) + if succeed: + self.log.info("Phi positioner moved to {} degree".format(phi)) + else: + self.log.warning( + "Fail to move phi positioner to {} degree".format(phi)) + self.log.info("Phi positioner moved to {} degree".format(phi)) + max_ranges = [phi] + + for theta in self.positioner['theta_range']: + + succeed = self.theta_card.moveTo(theta) + if succeed: + self.log.info( + "Theta positioner moved to {} degree".format(theta)) + else: + self.log.warning( + "Failed to move theta positioner to {} degree".format( + theta)) + self.log.info( + "Theta positioner moved to {} degree".format(theta)) + connected = self.recover_bt_link() + if connected == 0: + self.log.warning("Skip this angle as BT connection failed") + max_range = self.attenuation_vector['max'] + return max_range + else: + self.log.info('Connection restored') + ramp_attenuation(self.attenuator, + self.attenuation_vector['start_fine']) + max_range = self.find_bt_max_range_linear_fine_search() + max_ranges.append(max_range) + max_range_all.append(max_ranges) + columns = ['Phi/Theta'] + columns.extend(self.positioner['theta_range']) + df_range = pd.DataFrame(max_range_all, columns=columns) + df_range.to_csv(self.file_output, index=False) diff --git a/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py b/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py index 7d3ba45c23..2f2e666a8a 100644 --- a/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py +++ b/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py @@ -39,7 +39,7 @@ class GattConnectionStressTest(BaseTestClass): gatt_connect_err_message = "Gatt connection failed with: {}" gatt_disconnect_err_message = "Gatt disconnection failed with: {}" ble_advertise_interval = 50 - scan_timeout_seconds = 10 + scan_timeout_seconds = 60 default_iterations = 1000 def setup_class(self): @@ -49,6 +49,10 @@ class GattConnectionStressTest(BaseTestClass): self.default_iterations = self.user_params.get( "gatt_connect_stress_test_iterations", self.default_iterations) + def on_fail(self, test_name, begin_time): + for fd in self.fuchsia_devices: + fd.take_bug_report(test_name, begin_time) + def _orchestrate_single_connect_disconnect(self): adv_name = generate_id_by_size(10) adv_data = { diff --git a/acts_tests/tests/google/tel/live/TelLiveDataTest.py b/acts_tests/tests/google/tel/live/TelLiveDataTest.py index a2493ed299..2b0b3e714f 100755 --- a/acts_tests/tests/google/tel/live/TelLiveDataTest.py +++ b/acts_tests/tests/google/tel/live/TelLiveDataTest.py @@ -21,6 +21,7 @@ import collections import random import time import os +from queue import Empty from acts import signals from acts.test_decorators import test_tracker_info @@ -36,18 +37,19 @@ from acts.test_utils.tel.tel_defines import DIRECTION_MOBILE_TERMINATED from acts.test_utils.tel.tel_defines import DATA_STATE_CONNECTED from acts.test_utils.tel.tel_defines import FAKE_DATE_TIME from acts.test_utils.tel.tel_defines import FAKE_YEAR +from acts.test_utils.tel.tel_defines import EventNetworkCallback +from acts.test_utils.tel.tel_defines import NetworkCallbackCapabilitiesChanged +from acts.test_utils.tel.tel_defines import NetworkCallbackLost from acts.test_utils.tel.tel_defines import GEN_2G from acts.test_utils.tel.tel_defines import GEN_3G from acts.test_utils.tel.tel_defines import GEN_4G from acts.test_utils.tel.tel_defines import NETWORK_SERVICE_DATA -from acts.test_utils.tel.tel_defines import NETWORK_SERVICE_VOICE from acts.test_utils.tel.tel_defines import RAT_2G from acts.test_utils.tel.tel_defines import RAT_3G from acts.test_utils.tel.tel_defines import RAT_4G from acts.test_utils.tel.tel_defines import RAT_5G from acts.test_utils.tel.tel_defines import NETWORK_MODE_WCDMA_ONLY from acts.test_utils.tel.tel_defines import NETWORK_MODE_NR_LTE_GSM_WCDMA -from acts.test_utils.tel.tel_defines import RAT_FAMILY_LTE from acts.test_utils.tel.tel_defines import SIM1_SLOT_INDEX from acts.test_utils.tel.tel_defines import SIM2_SLOT_INDEX from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_NW_SELECTION @@ -55,7 +57,7 @@ from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_TETHERING_ENTITLEMENT_ from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_WIFI_CONNECTION from acts.test_utils.tel.tel_defines import TETHERING_MODE_WIFI from acts.test_utils.tel.tel_defines import WAIT_TIME_AFTER_REBOOT -from acts.test_utils.tel.tel_defines import WAIT_TIME_AFTER_MODE_CHANGE +from acts.test_utils.tel.tel_defines import WAIT_TIME_BETWEEN_STATE_CHECK from acts.test_utils.tel.tel_defines import WAIT_TIME_ANDROID_STATE_SETTLING from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_USER_PLANE_DATA from acts.test_utils.tel.tel_defines import WAIT_TIME_BETWEEN_REG_AND_CALL @@ -81,14 +83,11 @@ from acts.test_utils.tel.tel_test_utils import \ from acts.test_utils.tel.tel_test_utils import ensure_wifi_connected from acts.test_utils.tel.tel_test_utils import get_mobile_data_usage from acts.test_utils.tel.tel_test_utils import get_slot_index_from_subid -from acts.test_utils.tel.tel_test_utils import get_network_rat_for_subscription from acts.test_utils.tel.tel_test_utils import hangup_call from acts.test_utils.tel.tel_test_utils import multithread_func from acts.test_utils.tel.tel_test_utils import reboot_device from acts.test_utils.tel.tel_test_utils import remove_mobile_data_usage_limit -from acts.test_utils.tel.tel_test_utils import set_call_state_listen_level from acts.test_utils.tel.tel_test_utils import set_mobile_data_usage_limit -from acts.test_utils.tel.tel_test_utils import setup_sim from acts.test_utils.tel.tel_test_utils import stop_wifi_tethering from acts.test_utils.tel.tel_test_utils import start_wifi_tethering from acts.test_utils.tel.tel_test_utils import is_current_network_5g_nsa @@ -96,7 +95,6 @@ from acts.test_utils.tel.tel_test_utils import set_preferred_mode_for_5g from acts.test_utils.tel.tel_test_utils import get_current_override_network_type from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb -from acts.test_utils.tel.tel_test_utils import toggle_volte from acts.test_utils.tel.tel_test_utils import verify_internet_connection from acts.test_utils.tel.tel_test_utils import verify_http_connection from acts.test_utils.tel.tel_test_utils import verify_incall_state @@ -114,7 +112,6 @@ from acts.test_utils.tel.tel_test_utils import wifi_toggle_state from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G from acts.test_utils.tel.tel_test_utils import WIFI_SSID_KEY -from acts.test_utils.tel.tel_test_utils import bring_up_sl4a from acts.test_utils.tel.tel_test_utils import check_data_stall_detection from acts.test_utils.tel.tel_test_utils import check_network_validation_fail from acts.test_utils.tel.tel_test_utils import break_internet_except_sl4a_port @@ -142,7 +139,6 @@ from acts.utils import enable_doze from acts.utils import rand_ascii_str from acts.utils import adb_shell_ping - class TelLiveDataTest(TelephonyBaseTest): def setup_class(self): super().setup_class() @@ -159,6 +155,43 @@ class TelLiveDataTest(TelephonyBaseTest): TelephonyBaseTest.teardown_class(self) + def _listen_for_network_callback(self, ad, event, apm_mode=False): + """Verify network callback for Meteredness + + Args: + ad: DUT to get the network callback for + event: Network callback event + + Returns: + True: if the expected network callback found, False if not + """ + key = ad.droid.connectivityRegisterDefaultNetworkCallback() + ad.droid.connectivityNetworkCallbackStartListeningForEvent(key, event) + if apm_mode: + ad.log.info("Turn on Airplane Mode") + toggle_airplane_mode(ad.log, ad, True) + curr_time = time.time() + status = False + while time.time() < curr_time + MAX_WAIT_TIME_USER_PLANE_DATA: + try: + nc_event = ad.ed.pop_event(EventNetworkCallback) + ad.log.info("Received: %s" % + nc_event["data"]["networkCallbackEvent"]) + if nc_event["data"]["networkCallbackEvent"] == event: + status = nc_event["data"]["metered"] + ad.log.info("Current state of Meteredness is %s", status) + break + except Empty: + pass + + ad.droid.connectivityNetworkCallbackStopListeningForEvent(key, event) + ad.droid.connectivityUnregisterNetworkCallback(key) + if apm_mode: + ad.log.info("Turn off Airplane Mode") + toggle_airplane_mode(ad.log, ad, False) + time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK) + return status + """ Tests Begin """ @@ -441,6 +474,91 @@ class TelLiveDataTest(TelephonyBaseTest): return result + @test_tracker_info(uuid="754810e8-cd93-48e2-9c70-9d951ea416fe") + @TelephonyBaseTest.tel_test_wrap + def test_5g_nsa_metered_cellular(self): + """ Verifies 5G Meteredness API + + Set Mode to 5G + Wait for 5G attached on NSA + Register for Connectivity callback + Verify value of metered flag + + Returns: + True if pass; False if fail. + """ + try: + ad = self.android_devices[0] + wifi_toggle_state(ad.log, ad, False) + toggle_airplane_mode(ad.log, ad, False) + set_preferred_mode_for_5g(ad) + if not is_current_network_5g_nsa(ad): + ad.log.error("Phone not attached on 5G NSA") + return False + return self._listen_for_network_callback(ad, + NetworkCallbackCapabilitiesChanged) + except Exception as e: + ad.log.error(e) + return False + + + @test_tracker_info(uuid="5596d166-5543-4c55-8f65-84234ecb5ba7") + @TelephonyBaseTest.tel_test_wrap + def test_5g_nsa_metered_airplane(self): + """ Verifies 5G Meteredness API + + Set Mode to 5G, Turn on Airplane mode + Register for Connectivity callback + Verify value of metered flag + + Returns: + True if pass; False if fail. + """ + try: + ad = self.android_devices[0] + wifi_toggle_state(ad.log, ad, False) + set_preferred_mode_for_5g(ad) + return self._listen_for_network_callback(ad, + NetworkCallbackLost, apm_mode=True) + except Exception as e: + ad.log.error(e) + toggle_airplane_mode(ad.log, ad, False) + time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK) + return False + + @test_tracker_info(uuid="8fb6c4db-9424-4041-be28-5f4552b2afbf") + @TelephonyBaseTest.tel_test_wrap + def test_5g_nsa_metered_wifi(self): + """ Verifies 5G Meteredness API + + Set Mode to 5G, Wifi Connected + Register for Connectivity callback + Verify value of metered flag + + Returns: + True if pass; False if fail. + """ + ad = self.android_devices[0] + try: + toggle_airplane_mode(ad.log, ad, False) + set_preferred_mode_for_5g(ad) + if not is_current_network_5g_nsa(ad): + ad.log.error("Phone not attached on 5G NSA") + wifi_toggle_state(ad.log, ad, True) + if not ensure_wifi_connected(ad.log, ad, + self.wifi_network_ssid, + self.wifi_network_pass): + ad.log.error("WiFi connect fail.") + return False + return self._listen_for_network_callback(ad, + NetworkCallbackCapabilitiesChanged) + except Exception as e: + ad.log.error(e) + return False + finally: + wifi_toggle_state(ad.log, ad, False) + + @test_tracker_info(uuid="1b0354f3-8668-4a28-90a5-3b3d2b2756d3") @TelephonyBaseTest.tel_test_wrap def test_airplane_mode(self): |