summaryrefslogtreecommitdiffstats
path: root/acts_tests
diff options
context:
space:
mode:
Diffstat (limited to 'acts_tests')
-rwxr-xr-xacts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py554
-rw-r--r--acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py6
-rwxr-xr-xacts_tests/tests/google/tel/live/TelLiveDataTest.py136
3 files changed, 686 insertions, 10 deletions
diff --git a/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py b/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py
new file mode 100755
index 0000000000..e0a7962788
--- /dev/null
+++ b/acts_tests/tests/google/bt/performance/BtA2dpOtaRangeTest.py
@@ -0,0 +1,554 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+"""
+Created on Thu Jul 16 22:58:03 2020
+
+@author: qijiang
+"""
+
+import os
+import pyvisa
+import time
+import acts.test_utils.coex.audio_test_utils as atu
+import acts.test_utils.bt.bt_test_utils as btutils
+import pandas as pd
+from acts import asserts
+from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
+from acts.test_utils.bt.A2dpBaseTest import A2dpBaseTest
+from acts.test_utils.power.PowerBTBaseTest import ramp_attenuation
+
+PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
+
+
+class RPIAxis(object):
+ def __init__(self, VisaConnectString):
+ """Constructor.
+ Create a Visa connection
+
+ """
+ rm = pyvisa.ResourceManager()
+ self.instrument = rm.open_resource(VisaConnectString)
+ self.instrument.read_termination = "\n" # make sure we look for newline at the end of strings we read
+
+ def __getattr__(self, attr):
+ return getattr(self.instrument, attr) # Delegate all other attrs
+
+
+class RPIAxis_card(RPIAxis):
+ """ RPIAxis_card()
+ Create an axis
+
+ """
+ def __init__(self, axis_object):
+ # create an object to communicate to an RPI2 (remote positioner instrument) axis
+ self.axis = axis_object # store pyvisa instrument connection
+
+ def __getattr__(self, attr):
+ return getattr(self.axis, attr) # Delegate all other attrs
+
+ def moveTo(self, where):
+ """ moveTo
+ move to a given position and make sure you arrived at the target.
+ """
+ # max travale time in seconds. adjust this if you have a really slow positioner!
+ MAXTRAVELTIME = 150
+ t0 = time.time()
+ self.axis.write("SK %d\n" % where)
+ done = False
+ while (not done):
+ if (time.time() - t0) > MAXTRAVELTIME:
+ print("looks like we are stuck!\n")
+ return False
+ response = self.axis.query("*opc?\n")
+ if (response == '1'):
+ return True
+ else:
+ response = self.axis.query("CP?\n")
+
+ # stop the positioner
+ def Stop(self):
+ t0 = time.time()
+ done = False
+ self.axis.write("ST\n")
+ while (not done):
+ if (time.time() - t0) > 2:
+ print("Runaway positioner!\n")
+ return False
+ response = self.axis.query("*opc?\n")
+ if (response == '1'):
+ return True
+
+ # set continuous rotation mode
+ def SetContinuousRotationMode(self):
+ self.axis.write("CR\n")
+
+ # set non continuous rotation mode
+ def SetNonContinuousRotationMode(self):
+ self.axis.write("NCR\n")
+
+
+class BtA2dpOtaRangeTest(A2dpBaseTest):
+ def setup_class(self):
+
+ #'audio_params' is a dict, contains the audio device type, audio streaming
+ #settings such as volume, duration, audio recording parameters such as
+ #channel, sampling rate/width, and thdn parameters for audio processing
+ req_params = [
+ 'audio_params', 'positioner', 'dut_config', 'attenuation_vector'
+ ]
+ opt_params = ['music_files']
+ self.unpack_userparams(req_params)
+ if len(self.android_devices) > 1:
+ self.dut = self.android_devices[1]
+ self.unpack_userparams(opt_params)
+ music_src = self.music_files[0]
+ music_dest = PHONE_MUSIC_FILE_DIRECTORY
+ success = self.dut.push_system_file(music_src, music_dest)
+ if success:
+ self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
+ os.path.basename(music_src))
+ # Initialize media_control class
+ self.media = btutils.MediaControlOverSl4a(self.dut,
+ self.music_file)
+ # Set attenuator to minimum attenuation
+ self.attenuator = self.attenuators[0]
+ self.attenuator.set_atten(self.attenuation_vector['min'])
+ # Create the BTOE(Bluetooth-Other-End) device object
+ bt_devices = self.user_params.get('bt_devices', [])
+ attr, idx = bt_devices.split(':')
+ self.bt_device_controller = getattr(self, attr)[int(idx)]
+ self.bt_device = bt_factory().generate(self.bt_device_controller)
+ btutils.enable_bqr(self.bt_device_controller)
+
+ #Setup positioner
+ self.PhiAxisAddress = "TCPIP0::{}::{}::SOCKET".format(
+ self.positioner["server_ip"], self.positioner["phi_axis_port"])
+ self.ThetaAxisAddress = "TCPIP0::{}::{}::SOCKET".format(
+ self.positioner["server_ip"], self.positioner["theta_axis_port"])
+ self.phi_axis = RPIAxis(self.PhiAxisAddress)
+ self.phi_card = RPIAxis_card(self.phi_axis)
+ self.log.info("*IDN? response: {}".format(
+ self.phi_card.query("*idn?\n")))
+ self.theta_axis = RPIAxis(self.ThetaAxisAddress)
+ self.theta_card = RPIAxis_card(self.theta_axis)
+ self.log.info("*IDN? response: {}".format(
+ self.theta_card.query("*idn?\n")))
+ self.phi_card.Stop()
+ self.theta_card.Stop()
+
+ def teardown_class(self):
+
+ if hasattr(self, 'media'):
+ self.media.stop()
+ if hasattr(self, 'attenuator'):
+ self.attenuator.set_atten(self.attenuation_vector['min'])
+ if hasattr(self, 'dut'):
+ self.dut.droid.bluetoothFactoryReset()
+ btutils.disable_bluetooth(self.dut.droid)
+ self.bt_device.reset()
+ self.bt_device.power_off()
+ self.phi_card.moveTo(0)
+ self.theta_card.moveTo(0)
+
+ def setup_test(self):
+
+ # Reset headset
+ self.bt_device.reset()
+ # Initialize audio capture devices
+ self.audio_device = atu.get_audio_capture_device(
+ self.bt_device_controller, self.audio_params)
+ # Connect BT link
+ connected = self.establish_bt_connection()
+ asserts.assert_true(connected, 'BT connection failed')
+ # Output file
+ file_name = 'OTA_Range_Over_Angle_{}_{}.csv'.format(
+ self.dut_config['model'], self.dut_config['screen_placement'])
+ self.file_output = os.path.join(self.log_path, file_name)
+
+ def teardown_test(self):
+
+ if hasattr(self, 'media'):
+ self.media.stop()
+ if hasattr(self, 'attenuator'):
+ self.attenuator.set_atten(self.attenuation_vector['min'])
+ if hasattr(self, 'dut'):
+ self.dut.droid.bluetoothFactoryReset()
+ btutils.disable_bluetooth(self.dut.droid)
+ self.bt_device.reset()
+ self.bt_device.power_off()
+ self.phi_card.moveTo(0)
+ self.theta_card.moveTo(0)
+
+ def a2dp_play(self):
+
+ if hasattr(self, 'dut'):
+ vol = self.dut.droid.getMaxMediaVolume(
+ ) * self.audio_params['volume']
+ self.dut.droid.setMediaVolume(vol)
+ self.media.play()
+ else:
+ vol = self.bt_device_controller.droid.getMaxMediaVolume(
+ ) * self.audio_params['volume']
+ self.bt_device_controller.droid.setMediaVolume(vol)
+ self.bt_device.previous_track()
+ self.bt_device.play()
+
+ def a2dp_stop(self):
+
+ if hasattr(self, 'dut'):
+ self.media.stop()
+ else:
+ self.bt_device.pause()
+
+ def establish_bt_connection(self):
+
+ if hasattr(self, 'dut'):
+ self.dut.droid.bluetoothFactoryReset()
+ self.bt_device.reset()
+ self.bt_device.power_on()
+ btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
+ connected = btutils.connect_phone_to_headset(
+ self.dut, self.bt_device, 60)
+ vol = self.dut.droid.getMaxMediaVolume(
+ ) * self.audio_params['volume']
+ self.dut.droid.setMediaVolume(0)
+ time.sleep(1)
+ self.dut.droid.setMediaVolume(int(vol))
+ self.media.play()
+ return connected
+
+ elif len(self.bt_device_controller.droid.
+ bluetoothA2dpSinkGetConnectedDevices()) == 0:
+ self.log.warning('Need manual intervention to connect BT link')
+ os.system(
+ 'spd-say "Please manually connect BT and start playback"')
+ input('Once fixed, please press ENTER to resume the test')
+ return 1
+
+ def run_thdn_analysis(self, audio_captured):
+ """Calculate Total Harmonic Distortion plus Noise for latest recording.
+
+ Args:
+ audio_captured: the captured audio file
+ Returns:
+ thdn: thdn value in a list
+ """
+ # Calculate Total Harmonic Distortion + Noise
+ audio_result = atu.AudioCaptureResult(audio_captured,
+ self.audio_params)
+ thdn = audio_result.THDN(**self.audio_params['thdn_params'])
+ return thdn
+
+ def record_audio_and_analyze_thdn(self):
+
+ self.a2dp_play()
+ time.sleep(1)
+ self.audio_device.start()
+ time.sleep(self.audio_params['duration'])
+ audio_captured = self.audio_device.stop()
+ audio_result = atu.AudioCaptureResult(audio_captured,
+ self.audio_params)
+ thdn = audio_result.THDN(**self.audio_params['thdn_params'])
+ self.log.info('THDN is {}'.format(thdn[0]))
+
+ self.a2dp_stop()
+
+ return thdn[0]
+
+ def recover_bt_link(self):
+ """Recover BT link during test.
+
+ Recover BT link from the a2dp sink device
+
+ Returns:
+ connected: signal whether bt link is restored
+ """
+ #Try to connect from the sink device
+ if len(self.bt_device_controller.droid.bluetoothGetConnectedDevices()
+ ) == 0:
+ self.log.warning('Try to recover BT link')
+ self.attenuator.set_atten(self.attenuation_vector['min'])
+
+ if hasattr(self, 'dut'):
+ connected = self.establish_bt_connection()
+ return connected
+ else:
+ device_bonded = self.bt_device_controller.droid.bluetoothGetBondedDevices(
+ )[0]['address']
+ trial_count = 0
+ trial_limit = 3
+ self.log.info('Try to reconnect from the sink device')
+ while trial_count < trial_limit:
+ #Connect master device from the sink device
+ time_start = time.time()
+ while time.time() < time_start + 5:
+ try:
+ self.bt_device_controller.droid.bluetoothConnectBonded(
+ device_bonded)
+ break
+ except:
+ pass
+ time.sleep(2)
+ if len(self.bt_device_controller.droid.
+ bluetoothA2dpSinkGetConnectedDevices()) > 0:
+ vol = self.bt_device_controller.droid.getMaxMediaVolume(
+ ) * self.audio_params['volume']
+ self.bt_device_controller.droid.setMediaVolume(0)
+ time.sleep(1)
+ self.bt_device_controller.droid.setMediaVolume(
+ int(vol))
+ return 1
+ trial_count += 1
+ #Automated reconnect from sink device doesn't work, start fresh connection
+ if trial_count >= trial_limit:
+ self.log.info(
+ 'Need manual intervention on the master device side')
+ connected = self.establish_bt_connection()
+ return connected
+ else:
+ return 1
+
+ def find_bt_max_range_bisection_search(self):
+
+ #First linear search to narrow the bisection search
+ atten_min = self.attenuation_vector['min']
+ atten_max = self.attenuation_vector['max']
+ atten_step = self.attenuation_vector['step_bisection']
+ #Start from initial attenuation
+ atten_left = atten_min
+ atten_right = atten_min
+ while atten_left == atten_right and atten_left < atten_max:
+ atten_now = self.attenuator.get_atten()
+ connected = self.recover_bt_link()
+ if connected == 0:
+ self.log.warning("Skip this angle as BT connection failed")
+ max_range = atten_max
+ return max_range
+ else:
+ self.log.info('Connection restored')
+ ramp_attenuation(self.attenuator, atten_now)
+ self.log.info("Attenuation set to {}".format(atten_now))
+ time.sleep(2)
+
+ thdn = self.record_audio_and_analyze_thdn()
+ if thdn > self.audio_params['thdn_threshold'] or thdn == 0:
+ #Hit the right limit for bisection search
+ if atten_right == atten_min:
+ self.log.warning('Link breaks at the minimum attenuation')
+ max_range = atten_min
+ return max_range
+ else:
+ atten_right = atten_now
+ self.log.info(
+ 'Right limit found at {} dB'.format(atten_right))
+ else:
+ atten_left = atten_now
+ atten_right = atten_left
+ atten_next = min(atten_now + atten_step, atten_max)
+ ramp_attenuation(self.attenuator, atten_next)
+ if atten_left == atten_right:
+ self.log.warning('Could not reach max range')
+ max_range = atten_max
+ return max_range
+
+ #Start the bisection search
+ self.log.info('Start bisection search between {} dB and {} dB'.format(
+ atten_left, atten_right))
+ while atten_right - atten_left > 1:
+ connected = self.recover_bt_link()
+ if connected == 0:
+ self.log.warning("Skip this angle as BT connection failed")
+ max_range = atten_max
+ return max_range
+ else:
+ self.log.info('Connection restored')
+
+ atten_mid = round((atten_left + atten_right) / 2)
+ ramp_attenuation(self.attenuator, atten_mid)
+ atten_now = self.attenuator.get_atten()
+ self.log.info("Attenuation set to {}".format(atten_now))
+ time.sleep(5)
+ thdn = self.record_audio_and_analyze_thdn()
+ if thdn > self.audio_params['thdn_threshold'] or thdn == 0:
+ atten_right = atten_mid
+ max_range = atten_right - 1
+ else:
+ atten_left = atten_mid
+ max_range = atten_left
+ self.log.info('Max range reached at {} dB'.format(max_range))
+ return max_range
+
+ def find_bt_max_range_linear_fine_search(self):
+
+ thdn = 0.03
+ atten_now = self.attenuator.get_atten()
+
+ while thdn < self.audio_params[
+ 'thdn_threshold'] and thdn != 0 and atten_now < self.attenuation_vector[
+ 'max']:
+ atten_now = self.attenuator.get_atten()
+ self.log.info("Attenuation set to {}".format(atten_now))
+ thdn = self.record_audio_and_analyze_thdn()
+ self.log.info("THDN is {}".format(thdn))
+ self.attenuator.set_atten(atten_now +
+ self.attenuation_vector['step_fine'])
+ max_range = self.attenuator.get_atten(
+ ) - self.attenuation_vector['step_fine'] * 2
+ if thdn == 0:
+ self.log.warning(
+ "Music play stopped, link might get lost, max range reached at {} dB"
+ .format(max_range))
+ else:
+ self.log.info("Max range reached at {}".format(max_range))
+ if atten_now == self.attenuation_vector['max']:
+ self.log.warning("Fail to reach max range")
+ return max_range
+
+ def test_bisection_search_max(self):
+
+ #Find the BT max range under each angle using bisection search
+ max_range_all = []
+
+ for phi in self.positioner['phi_range']:
+
+ succeed = self.phi_card.moveTo(phi)
+ if succeed:
+ self.log.info("Phi positioner moved to {} degree".format(phi))
+ else:
+ self.log.warning(
+ "Fail to move phi positioner to {} degree".format(phi))
+ self.log.info("Phi positioner moved to {} degree".format(phi))
+ max_ranges = [phi]
+
+ for theta in self.positioner['theta_range']:
+
+ succeed = self.theta_card.moveTo(theta)
+ if succeed:
+ self.log.info(
+ "Theta positioner moved to {} degree".format(theta))
+ else:
+ self.log.warning(
+ "Failed to move theta positioner to {} degree".format(
+ theta))
+ self.log.info(
+ "Theta positioner moved to {} degree".format(theta))
+
+ ramp_attenuation(self.attenuator,
+ self.attenuation_vector['min'])
+ time.sleep(2)
+ max_range = self.find_bt_max_range_bisection_search()
+ max_ranges.append(max_range)
+ max_range_all.append(max_ranges)
+ columns = ['Phi/Theta']
+ columns.extend(self.positioner['theta_range'])
+ df = pd.DataFrame(max_range_all, columns=columns)
+ df.to_csv(self.file_output, index=False)
+
+ def test_coarse_search(self):
+
+ #Coarse search to find the highest minimum attenuation can be set to
+ #be a starting point for all angles
+ thdn = 0.03
+ max_atten_reached = 0
+ ramp_attenuation(self.attenuator,
+ self.attenuation_vector['start_coarse'])
+ self.log.info('Start attenuation at {} dB'.format(
+ self.attenuator.get_atten()))
+ while True:
+ atten_now = self.attenuator.get_atten()
+ if atten_now == self.attenuation_vector['max']:
+ if max_atten_reached > 1:
+ self.log.warning(
+ 'Can not reach to the highest minimum, attenuator is already set to be max, need to add more attenuation'
+ )
+ break
+ for phi in self.positioner['phi_range']:
+ if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
+ break
+ succeed = self.phi_card.moveTo(phi)
+ if succeed:
+ self.log.info(
+ "Phi positioner moved to {} degree".format(phi))
+ else:
+ self.log.warning(
+ "Fail to move phi positioner to {} degree".format(phi))
+ self.log.info("Phi positioner moved to {} degree".format(phi))
+
+ for theta in self.positioner['theta_range']:
+
+ succeed = self.theta_card.moveTo(theta)
+ if succeed:
+ self.log.info(
+ "Theta positioner moved to {} degree".format(
+ theta))
+ else:
+ self.log.warning(
+ "Failed to move theta positioner to {} degree".
+ format(theta))
+ self.log.info(
+ "Theta positioner moved to {} degree".format(theta))
+
+ thdn = self.record_audio_and_analyze_thdn()
+ self.log.info(
+ 'THDN at thea {} degree, phi {} degree is {}'.format(
+ theta, phi, thdn))
+ if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
+ break
+ if thdn == 0 or thdn >= self.audio_params["thdn_threshold"]:
+ highest_max = self.attenuator.get_atten(
+ ) - self.attenuation_vector['step_coarse']
+ self.log.info(
+ 'Highest minimum attenuation is {} dB, fine search can start from there'
+ .format(highest_max))
+ break
+ atten_new = min(atten_now + self.attenuation_vector['step_coarse'],
+ self.attenuation_vector['max'])
+ if atten_new == self.attenuation_vector['max']:
+ max_atten_reached += 1
+ self.attenuator.set_atten(atten_new)
+ self.log.info('\nSetting attenuator to {} dB'.format(
+ self.attenuator.get_atten()))
+
+ def test_finestep_search_max(self):
+
+ #Find the BT max range under each angle with a finer step search
+ max_range_all = []
+ for phi in self.positioner['phi_range']:
+
+ succeed = self.phi_card.moveTo(phi)
+ if succeed:
+ self.log.info("Phi positioner moved to {} degree".format(phi))
+ else:
+ self.log.warning(
+ "Fail to move phi positioner to {} degree".format(phi))
+ self.log.info("Phi positioner moved to {} degree".format(phi))
+ max_ranges = [phi]
+
+ for theta in self.positioner['theta_range']:
+
+ succeed = self.theta_card.moveTo(theta)
+ if succeed:
+ self.log.info(
+ "Theta positioner moved to {} degree".format(theta))
+ else:
+ self.log.warning(
+ "Failed to move theta positioner to {} degree".format(
+ theta))
+ self.log.info(
+ "Theta positioner moved to {} degree".format(theta))
+ connected = self.recover_bt_link()
+ if connected == 0:
+ self.log.warning("Skip this angle as BT connection failed")
+ max_range = self.attenuation_vector['max']
+ return max_range
+ else:
+ self.log.info('Connection restored')
+ ramp_attenuation(self.attenuator,
+ self.attenuation_vector['start_fine'])
+ max_range = self.find_bt_max_range_linear_fine_search()
+ max_ranges.append(max_range)
+ max_range_all.append(max_ranges)
+ columns = ['Phi/Theta']
+ columns.extend(self.positioner['theta_range'])
+ df_range = pd.DataFrame(max_range_all, columns=columns)
+ df_range.to_csv(self.file_output, index=False)
diff --git a/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py b/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py
index 7d3ba45c23..2f2e666a8a 100644
--- a/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py
+++ b/acts_tests/tests/google/fuchsia/bt/gatt/GattConnectionStressTest.py
@@ -39,7 +39,7 @@ class GattConnectionStressTest(BaseTestClass):
gatt_connect_err_message = "Gatt connection failed with: {}"
gatt_disconnect_err_message = "Gatt disconnection failed with: {}"
ble_advertise_interval = 50
- scan_timeout_seconds = 10
+ scan_timeout_seconds = 60
default_iterations = 1000
def setup_class(self):
@@ -49,6 +49,10 @@ class GattConnectionStressTest(BaseTestClass):
self.default_iterations = self.user_params.get(
"gatt_connect_stress_test_iterations", self.default_iterations)
+ def on_fail(self, test_name, begin_time):
+ for fd in self.fuchsia_devices:
+ fd.take_bug_report(test_name, begin_time)
+
def _orchestrate_single_connect_disconnect(self):
adv_name = generate_id_by_size(10)
adv_data = {
diff --git a/acts_tests/tests/google/tel/live/TelLiveDataTest.py b/acts_tests/tests/google/tel/live/TelLiveDataTest.py
index a2493ed299..2b0b3e714f 100755
--- a/acts_tests/tests/google/tel/live/TelLiveDataTest.py
+++ b/acts_tests/tests/google/tel/live/TelLiveDataTest.py
@@ -21,6 +21,7 @@ import collections
import random
import time
import os
+from queue import Empty
from acts import signals
from acts.test_decorators import test_tracker_info
@@ -36,18 +37,19 @@ from acts.test_utils.tel.tel_defines import DIRECTION_MOBILE_TERMINATED
from acts.test_utils.tel.tel_defines import DATA_STATE_CONNECTED
from acts.test_utils.tel.tel_defines import FAKE_DATE_TIME
from acts.test_utils.tel.tel_defines import FAKE_YEAR
+from acts.test_utils.tel.tel_defines import EventNetworkCallback
+from acts.test_utils.tel.tel_defines import NetworkCallbackCapabilitiesChanged
+from acts.test_utils.tel.tel_defines import NetworkCallbackLost
from acts.test_utils.tel.tel_defines import GEN_2G
from acts.test_utils.tel.tel_defines import GEN_3G
from acts.test_utils.tel.tel_defines import GEN_4G
from acts.test_utils.tel.tel_defines import NETWORK_SERVICE_DATA
-from acts.test_utils.tel.tel_defines import NETWORK_SERVICE_VOICE
from acts.test_utils.tel.tel_defines import RAT_2G
from acts.test_utils.tel.tel_defines import RAT_3G
from acts.test_utils.tel.tel_defines import RAT_4G
from acts.test_utils.tel.tel_defines import RAT_5G
from acts.test_utils.tel.tel_defines import NETWORK_MODE_WCDMA_ONLY
from acts.test_utils.tel.tel_defines import NETWORK_MODE_NR_LTE_GSM_WCDMA
-from acts.test_utils.tel.tel_defines import RAT_FAMILY_LTE
from acts.test_utils.tel.tel_defines import SIM1_SLOT_INDEX
from acts.test_utils.tel.tel_defines import SIM2_SLOT_INDEX
from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_NW_SELECTION
@@ -55,7 +57,7 @@ from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_TETHERING_ENTITLEMENT_
from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_WIFI_CONNECTION
from acts.test_utils.tel.tel_defines import TETHERING_MODE_WIFI
from acts.test_utils.tel.tel_defines import WAIT_TIME_AFTER_REBOOT
-from acts.test_utils.tel.tel_defines import WAIT_TIME_AFTER_MODE_CHANGE
+from acts.test_utils.tel.tel_defines import WAIT_TIME_BETWEEN_STATE_CHECK
from acts.test_utils.tel.tel_defines import WAIT_TIME_ANDROID_STATE_SETTLING
from acts.test_utils.tel.tel_defines import MAX_WAIT_TIME_USER_PLANE_DATA
from acts.test_utils.tel.tel_defines import WAIT_TIME_BETWEEN_REG_AND_CALL
@@ -81,14 +83,11 @@ from acts.test_utils.tel.tel_test_utils import \
from acts.test_utils.tel.tel_test_utils import ensure_wifi_connected
from acts.test_utils.tel.tel_test_utils import get_mobile_data_usage
from acts.test_utils.tel.tel_test_utils import get_slot_index_from_subid
-from acts.test_utils.tel.tel_test_utils import get_network_rat_for_subscription
from acts.test_utils.tel.tel_test_utils import hangup_call
from acts.test_utils.tel.tel_test_utils import multithread_func
from acts.test_utils.tel.tel_test_utils import reboot_device
from acts.test_utils.tel.tel_test_utils import remove_mobile_data_usage_limit
-from acts.test_utils.tel.tel_test_utils import set_call_state_listen_level
from acts.test_utils.tel.tel_test_utils import set_mobile_data_usage_limit
-from acts.test_utils.tel.tel_test_utils import setup_sim
from acts.test_utils.tel.tel_test_utils import stop_wifi_tethering
from acts.test_utils.tel.tel_test_utils import start_wifi_tethering
from acts.test_utils.tel.tel_test_utils import is_current_network_5g_nsa
@@ -96,7 +95,6 @@ from acts.test_utils.tel.tel_test_utils import set_preferred_mode_for_5g
from acts.test_utils.tel.tel_test_utils import get_current_override_network_type
from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode
from acts.test_utils.tel.tel_test_utils import toggle_airplane_mode_by_adb
-from acts.test_utils.tel.tel_test_utils import toggle_volte
from acts.test_utils.tel.tel_test_utils import verify_internet_connection
from acts.test_utils.tel.tel_test_utils import verify_http_connection
from acts.test_utils.tel.tel_test_utils import verify_incall_state
@@ -114,7 +112,6 @@ from acts.test_utils.tel.tel_test_utils import wifi_toggle_state
from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_2G
from acts.test_utils.tel.tel_test_utils import WIFI_CONFIG_APBAND_5G
from acts.test_utils.tel.tel_test_utils import WIFI_SSID_KEY
-from acts.test_utils.tel.tel_test_utils import bring_up_sl4a
from acts.test_utils.tel.tel_test_utils import check_data_stall_detection
from acts.test_utils.tel.tel_test_utils import check_network_validation_fail
from acts.test_utils.tel.tel_test_utils import break_internet_except_sl4a_port
@@ -142,7 +139,6 @@ from acts.utils import enable_doze
from acts.utils import rand_ascii_str
from acts.utils import adb_shell_ping
-
class TelLiveDataTest(TelephonyBaseTest):
def setup_class(self):
super().setup_class()
@@ -159,6 +155,43 @@ class TelLiveDataTest(TelephonyBaseTest):
TelephonyBaseTest.teardown_class(self)
+ def _listen_for_network_callback(self, ad, event, apm_mode=False):
+ """Verify network callback for Meteredness
+
+ Args:
+ ad: DUT to get the network callback for
+ event: Network callback event
+
+ Returns:
+ True: if the expected network callback found, False if not
+ """
+ key = ad.droid.connectivityRegisterDefaultNetworkCallback()
+ ad.droid.connectivityNetworkCallbackStartListeningForEvent(key, event)
+ if apm_mode:
+ ad.log.info("Turn on Airplane Mode")
+ toggle_airplane_mode(ad.log, ad, True)
+ curr_time = time.time()
+ status = False
+ while time.time() < curr_time + MAX_WAIT_TIME_USER_PLANE_DATA:
+ try:
+ nc_event = ad.ed.pop_event(EventNetworkCallback)
+ ad.log.info("Received: %s" %
+ nc_event["data"]["networkCallbackEvent"])
+ if nc_event["data"]["networkCallbackEvent"] == event:
+ status = nc_event["data"]["metered"]
+ ad.log.info("Current state of Meteredness is %s", status)
+ break
+ except Empty:
+ pass
+
+ ad.droid.connectivityNetworkCallbackStopListeningForEvent(key, event)
+ ad.droid.connectivityUnregisterNetworkCallback(key)
+ if apm_mode:
+ ad.log.info("Turn off Airplane Mode")
+ toggle_airplane_mode(ad.log, ad, False)
+ time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK)
+ return status
+
""" Tests Begin """
@@ -441,6 +474,91 @@ class TelLiveDataTest(TelephonyBaseTest):
return result
+ @test_tracker_info(uuid="754810e8-cd93-48e2-9c70-9d951ea416fe")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_5g_nsa_metered_cellular(self):
+ """ Verifies 5G Meteredness API
+
+ Set Mode to 5G
+ Wait for 5G attached on NSA
+ Register for Connectivity callback
+ Verify value of metered flag
+
+ Returns:
+ True if pass; False if fail.
+ """
+ try:
+ ad = self.android_devices[0]
+ wifi_toggle_state(ad.log, ad, False)
+ toggle_airplane_mode(ad.log, ad, False)
+ set_preferred_mode_for_5g(ad)
+ if not is_current_network_5g_nsa(ad):
+ ad.log.error("Phone not attached on 5G NSA")
+ return False
+ return self._listen_for_network_callback(ad,
+ NetworkCallbackCapabilitiesChanged)
+ except Exception as e:
+ ad.log.error(e)
+ return False
+
+
+ @test_tracker_info(uuid="5596d166-5543-4c55-8f65-84234ecb5ba7")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_5g_nsa_metered_airplane(self):
+ """ Verifies 5G Meteredness API
+
+ Set Mode to 5G, Turn on Airplane mode
+ Register for Connectivity callback
+ Verify value of metered flag
+
+ Returns:
+ True if pass; False if fail.
+ """
+ try:
+ ad = self.android_devices[0]
+ wifi_toggle_state(ad.log, ad, False)
+ set_preferred_mode_for_5g(ad)
+ return self._listen_for_network_callback(ad,
+ NetworkCallbackLost, apm_mode=True)
+ except Exception as e:
+ ad.log.error(e)
+ toggle_airplane_mode(ad.log, ad, False)
+ time.sleep(WAIT_TIME_BETWEEN_STATE_CHECK)
+ return False
+
+ @test_tracker_info(uuid="8fb6c4db-9424-4041-be28-5f4552b2afbf")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_5g_nsa_metered_wifi(self):
+ """ Verifies 5G Meteredness API
+
+ Set Mode to 5G, Wifi Connected
+ Register for Connectivity callback
+ Verify value of metered flag
+
+ Returns:
+ True if pass; False if fail.
+ """
+ ad = self.android_devices[0]
+ try:
+ toggle_airplane_mode(ad.log, ad, False)
+ set_preferred_mode_for_5g(ad)
+ if not is_current_network_5g_nsa(ad):
+ ad.log.error("Phone not attached on 5G NSA")
+ wifi_toggle_state(ad.log, ad, True)
+ if not ensure_wifi_connected(ad.log, ad,
+ self.wifi_network_ssid,
+ self.wifi_network_pass):
+ ad.log.error("WiFi connect fail.")
+ return False
+ return self._listen_for_network_callback(ad,
+ NetworkCallbackCapabilitiesChanged)
+ except Exception as e:
+ ad.log.error(e)
+ return False
+ finally:
+ wifi_toggle_state(ad.log, ad, False)
+
+
@test_tracker_info(uuid="1b0354f3-8668-4a28-90a5-3b3d2b2756d3")
@TelephonyBaseTest.tel_test_wrap
def test_airplane_mode(self):