summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rwxr-xr-xacts/framework/acts/bin/monsoon.py131
-rw-r--r--acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py11
-rw-r--r--acts/framework/acts/controllers/cellular_simulator.py9
-rw-r--r--acts/framework/acts/controllers/fuchsia_lib/utils_lib.py9
-rwxr-xr-xacts/framework/acts/controllers/iperf_server.py35
-rw-r--r--acts/framework/acts/controllers/monsoon.py1010
-rw-r--r--acts/framework/acts/controllers/monsoon_lib/api/common.py79
-rw-r--r--acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py7
-rw-r--r--acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py7
-rw-r--r--acts/framework/acts/controllers/monsoon_lib/api/monsoon.py11
-rw-r--r--acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py9
-rw-r--r--acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py47
-rw-r--r--acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py18
-rw-r--r--acts/framework/acts/test_utils/bt/bt_power_test_utils.py205
-rw-r--r--acts/framework/acts/test_utils/bt/bt_test_utils.py247
-rw-r--r--acts/framework/acts/test_utils/gnss/gnss_test_utils.py16
-rw-r--r--acts/framework/acts/test_utils/power/PowerBTBaseTest.py135
-rw-r--r--acts/framework/acts/test_utils/power/PowerBaseTest.py196
-rw-r--r--acts/framework/acts/test_utils/power/PowerGnssBaseTest.py85
-rw-r--r--acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py9
-rw-r--r--acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py8
-rw-r--r--acts/framework/acts/test_utils/tel/tel_test_utils.py177
-rw-r--r--acts/framework/acts/test_utils/wifi/ota_chamber.py44
-rw-r--r--acts/framework/acts/test_utils/wifi/ota_sniffer.py19
-rw-r--r--acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py95
-rwxr-xr-xacts/framework/setup.py1
-rwxr-xr-xacts/framework/tests/acts_import_unit_test.py6
-rwxr-xr-xacts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py9
-rw-r--r--acts/tests/google/power/PowerBaselineTest.py4
-rw-r--r--acts/tests/google/power/bt/PowerBLEadvertiseTest.py63
-rw-r--r--acts/tests/google/power/bt/PowerBLEscanTest.py57
-rw-r--r--acts/tests/google/power/bt/PowerBTa2dpTest.py75
-rw-r--r--acts/tests/google/power/bt/PowerBTbaselineTest.py62
-rw-r--r--acts/tests/google/power/bt/PowerBTcalibrationTest.py65
-rw-r--r--acts/tests/google/power/bt/PowerBTidleTest.py59
-rw-r--r--acts/tests/google/power/bt/PowerBTscanTest.py85
-rw-r--r--acts/tests/google/power/gnss/PowerGnssDpoSimTest.py16
-rw-r--r--acts/tests/google/power/tel/lab/PowerTelHotspotTest.py4
-rw-r--r--acts/tests/google/power/tel/lab/PowerTelIdleTest.py4
-rw-r--r--acts/tests/google/power/tel/lab/PowerTelTrafficTest.py10
-rw-r--r--acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py4
-rw-r--r--acts/tests/google/power/wifi/PowerWiFiHotspotTest.py4
-rw-r--r--acts/tests/google/power/wifi/PowerWiFimulticastTest.py69
-rw-r--r--acts/tests/google/power/wifi/PowerWiFiroamingTest.py44
-rw-r--r--acts/tests/google/tel/live/TelLiveStressDataTest.py77
-rw-r--r--acts/tests/google/tel/live/TelLiveVoiceTest.py42
-rw-r--r--acts/tests/google/wifi/WifiSensitivityTest.py4
47 files changed, 1596 insertions, 1787 deletions
diff --git a/acts/framework/acts/bin/monsoon.py b/acts/framework/acts/bin/monsoon.py
index c43eca505e..a76b425dd9 100755
--- a/acts/framework/acts/bin/monsoon.py
+++ b/acts/framework/acts/bin/monsoon.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python3
#
-# Copyright 2016 - The Android Open Source Project
+# Copyright 2019 - The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,75 +13,100 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
"""Interface for a USB-connected Monsoon power meter
(http://msoon.com/LabEquipment/PowerMonitor/).
"""
import argparse
-import sys
-import time
-import collections
-from acts.controllers.monsoon import Monsoon
+import acts.controllers.monsoon as monsoon_controller
+
-def main(FLAGS):
+def main(args):
"""Simple command-line interface for Monsoon."""
- if FLAGS.avg and FLAGS.avg < 0:
- print("--avg must be greater than 0")
+ if args.avg and args.avg < 0:
+ print('--avg must be greater than 0')
return
- mon = Monsoon(serial=int(FLAGS.serialno[0]))
+ mon = monsoon_controller.create([int(args.serialno[0])])[0]
- if FLAGS.voltage is not None:
- mon.set_voltage(FLAGS.voltage)
+ if args.voltage is not None:
+ mon.set_voltage(args.voltage)
- if FLAGS.current is not None:
- mon.set_max_current(FLAGS.current)
+ if args.current is not None:
+ mon.set_max_current(args.current)
- if FLAGS.status:
+ if args.status:
items = sorted(mon.status.items())
- print("\n".join(["%s: %s" % item for item in items]))
+ print('\n'.join(['%s: %s' % item for item in items]))
- if FLAGS.usbpassthrough:
- mon.usb(FLAGS.usbpassthrough)
+ if args.usbpassthrough:
+ mon.usb(args.usbpassthrough)
- if FLAGS.startcurrent is not None:
- mon.set_max_init_current(FLAGS.startcurrent)
+ if args.startcurrent is not None:
+ mon.set_max_initial_current(args.startcurrent)
- if FLAGS.samples:
- # Have to sleep a bit here for monsoon to be ready to lower the rate of
- # socket read timeout.
- time.sleep(1)
- result = mon.take_samples(FLAGS.hz, FLAGS.samples,
- sample_offset=FLAGS.offset, live=True)
+ if args.samples:
+ result = mon.measure_power(
+ args.samples / args.hz,
+ measure_after_seconds=args.offset,
+ hz=args.hz,
+ output_path='monsoon_output.txt')
print(repr(result))
+
if __name__ == '__main__':
- parser = argparse.ArgumentParser(description=("This is a python utility "
- "tool to control monsoon power measurement boxes."))
- parser.add_argument("--status", action="store_true",
- help="Print power meter status.")
- parser.add_argument("-avg", "--avg", type=int, default=0,
- help="Also report average over last n data points.")
- parser.add_argument("-v", "--voltage", type=float,
- help="Set output voltage (0 for off)")
- parser.add_argument("-c", "--current", type=float,
- help="Set max output current.")
- parser.add_argument("-sc", "--startcurrent", type=float,
- help="Set max power-up/inital current.")
- parser.add_argument("-usb", "--usbpassthrough", choices=("on", "off",
- "auto"), help="USB control (on, off, auto).")
- parser.add_argument("-sp", "--samples", type=int,
- help="Collect and print this many samples")
- parser.add_argument("-hz", "--hz", type=int,
- help="Sample this many times per second.")
- parser.add_argument("-d", "--device", help="Use this /dev/ttyACM... file.")
- parser.add_argument("-sn", "--serialno", type=int, nargs=1, required=True,
- help="The serial number of the Monsoon to use.")
- parser.add_argument("--offset", type=int, nargs='?', default=0,
- help="The number of samples to discard when calculating average.")
- parser.add_argument("-r", "--ramp", action="store_true", help=("Gradually "
- "increase voltage to prevent tripping Monsoon overvoltage"))
- args = parser.parse_args()
- main(args)
+ parser = argparse.ArgumentParser(
+ description='This is a python utility tool to control monsoon power '
+ 'measurement boxes.')
+ parser.add_argument(
+ '--status', action='store_true', help='Print power meter status.')
+ parser.add_argument(
+ '-avg',
+ '--avg',
+ type=int,
+ default=0,
+ help='Also report average over last n data points.')
+ parser.add_argument(
+ '-v', '--voltage', type=float, help='Set output voltage (0 for off)')
+ parser.add_argument(
+ '-c', '--current', type=float, help='Set max output current.')
+ parser.add_argument(
+ '-sc',
+ '--startcurrent',
+ type=float,
+ help='Set max power-up/initial current.')
+ parser.add_argument(
+ '-usb',
+ '--usbpassthrough',
+ choices=('on', 'off', 'auto'),
+ help='USB control (on, off, auto).')
+ parser.add_argument(
+ '-sp',
+ '--samples',
+ type=int,
+ help='Collect and print this many samples')
+ parser.add_argument(
+ '-hz', '--hz', type=int, help='Sample this many times per second.')
+ parser.add_argument('-d', '--device', help='Use this /dev/ttyACM... file.')
+ parser.add_argument(
+ '-sn',
+ '--serialno',
+ type=int,
+ nargs=1,
+ required=True,
+ help='The serial number of the Monsoon to use.')
+ parser.add_argument(
+ '--offset',
+ type=int,
+ nargs='?',
+ default=0,
+ help='The number of samples to discard when calculating average.')
+ parser.add_argument(
+ '-r',
+ '--ramp',
+ action='store_true',
+ help='Gradually increase voltage to prevent tripping Monsoon '
+ 'overvoltage.')
+ arguments = parser.parse_args()
+ main(arguments)
diff --git a/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py b/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py
index 23fee412cb..08db20b52f 100644
--- a/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py
+++ b/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py
@@ -177,6 +177,17 @@ class MD8475CellularSimulator(cc.AbstractCellularSimulator):
else:
self.bts[bts_index].tbs_pattern = 'OFF'
+ def set_lte_rrc_state_change_timer(self, enabled, time=10):
+ """ Configures the LTE RRC state change timer.
+
+ Args:
+ enabled: a boolean indicating if the timer should be on or off.
+ time: time in seconds for the timer to expire
+ """
+ self.anritsu.set_lte_rrc_status_change(enabled)
+ if enabled:
+ self.anritsu.set_lte_rrc_status_change_timer(time)
+
def set_band(self, bts_index, band):
""" Sets the right duplex mode before switching to a new band.
diff --git a/acts/framework/acts/controllers/cellular_simulator.py b/acts/framework/acts/controllers/cellular_simulator.py
index a140c0f202..35f0885c46 100644
--- a/acts/framework/acts/controllers/cellular_simulator.py
+++ b/acts/framework/acts/controllers/cellular_simulator.py
@@ -133,6 +133,15 @@ class AbstractCellularSimulator:
if config.tbs_pattern_on is not None:
self.set_tbs_pattern_on(bts_index, config.tbs_pattern_on)
+ def set_lte_rrc_state_change_timer(self, enabled, time=10):
+ """ Configures the LTE RRC state change timer.
+
+ Args:
+ enabled: a boolean indicating if the timer should be on or off.
+ time: time in seconds for the timer to expire
+ """
+ raise NotImplementedError()
+
def set_band(self, bts_index, band):
""" Sets the band for the indicated base station.
diff --git a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
index 5df5b40127..f0b8dda4fb 100644
--- a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
+++ b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py
@@ -30,21 +30,24 @@ def get_private_key(ip_address, ssh_config):
Returns:
The ssh private key
"""
+ exceptions = []
try:
logging.debug('Trying to load SSH key type: ed25519')
return paramiko.ed25519key.Ed25519Key(
filename=get_ssh_key_for_host(ip_address, ssh_config))
- except paramiko.SSHException:
+ except paramiko.SSHException as e:
+ exceptions.append(e)
logging.debug('Failed loading SSH key type: ed25519')
try:
logging.debug('Trying to load SSH key type: rsa')
return paramiko.RSAKey.from_private_key_file(
filename=get_ssh_key_for_host(ip_address, ssh_config))
- except paramiko.SSHException:
+ except paramiko.SSHException as e:
+ exceptions.append(e)
logging.debug('Failed loading SSH key type: rsa')
- raise paramiko.SSHException('No valid ssh key type found')
+ raise Exception('No valid ssh key type found', exceptions)
def create_ssh_connection(ip_address,
diff --git a/acts/framework/acts/controllers/iperf_server.py b/acts/framework/acts/controllers/iperf_server.py
index bedc05a748..039f143470 100755
--- a/acts/framework/acts/controllers/iperf_server.py
+++ b/acts/framework/acts/controllers/iperf_server.py
@@ -81,20 +81,25 @@ class IPerfResult(object):
will be loaded and this funtion is not intended to be used with files
containing multiple iperf client runs.
"""
- try:
- with open(result_path, 'r') as f:
- iperf_output = f.readlines()
- if '}\n' in iperf_output:
- iperf_output = iperf_output[:iperf_output.index('}\n') + 1]
- iperf_string = ''.join(iperf_output)
- iperf_string = iperf_string.replace('nan', '0')
- self.result = json.loads(iperf_string)
- except ValueError:
- with open(result_path, 'r') as f:
- # Possibly a result from interrupted iperf run, skip first line
- # and try again.
- lines = f.readlines()[1:]
- self.result = json.loads(''.join(lines))
+ # if result_path isn't a path, treat it as JSON
+ if not os.path.exists(result_path):
+ self.result = json.loads(result_path)
+ else:
+ try:
+ with open(result_path, 'r') as f:
+ iperf_output = f.readlines()
+ if '}\n' in iperf_output:
+ iperf_output = iperf_output[:iperf_output.index('}\n')
+ + 1]
+ iperf_string = ''.join(iperf_output)
+ iperf_string = iperf_string.replace('nan', '0')
+ self.result = json.loads(iperf_string)
+ except ValueError:
+ with open(result_path, 'r') as f:
+ # Possibly a result from interrupted iperf run,
+ # skip first line and try again.
+ lines = f.readlines()[1:]
+ self.result = json.loads(''.join(lines))
def _has_data(self):
"""Checks if the iperf result has valid throughput data.
@@ -197,7 +202,7 @@ class IPerfResult(object):
instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval:
-1]
avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates)
- sqd_deviations = [(rate - avg_rate)**2 for rate in instantaneous_rates]
+ sqd_deviations = [(rate - avg_rate) ** 2 for rate in instantaneous_rates]
std_dev = math.sqrt(
math.fsum(sqd_deviations) / (len(sqd_deviations) - 1))
return std_dev
diff --git a/acts/framework/acts/controllers/monsoon.py b/acts/framework/acts/controllers/monsoon.py
index d5b24a2703..9488837dbb 100644
--- a/acts/framework/acts/controllers/monsoon.py
+++ b/acts/framework/acts/controllers/monsoon.py
@@ -13,1006 +13,28 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-"""Interface for a USB-connected Monsoon power meter
-(http://msoon.com/LabEquipment/PowerMonitor/).
-Based on the original py2 script of kens@google.com
-"""
-import fcntl
-import logging
-import os
-import select
-import struct
-import sys
-import time
-import collections
+from acts.controllers.monsoon_lib.api.hvpm.monsoon import Monsoon as HvpmMonsoon
+from acts.controllers.monsoon_lib.api.lvpm_stock.monsoon import Monsoon as LvpmStockMonsoon
-# http://pyserial.sourceforge.net/
-# On ubuntu, apt-get install python3-pyserial
-import serial
-
-import acts.signals
-
-from acts import utils
-from acts.controllers import android_device
-
-ACTS_CONTROLLER_CONFIG_NAME = "Monsoon"
-ACTS_CONTROLLER_REFERENCE_NAME = "monsoons"
+ACTS_CONTROLLER_CONFIG_NAME = 'Monsoon'
+ACTS_CONTROLLER_REFERENCE_NAME = 'monsoons'
def create(configs):
objs = []
- for c in configs:
- objs.append(Monsoon(serial=int(c)))
- return objs
-
-
-def destroy(objs):
- for obj in objs:
- fcntl.flock(obj.mon._tempfile, fcntl.LOCK_UN)
- obj.mon._tempfile.close()
-
-
-class MonsoonError(acts.signals.ControllerError):
- """Raised for exceptions encountered in monsoon lib."""
-
-
-class MonsoonProxy(object):
- """Class that directly talks to monsoon over serial.
-
- Provides a simple class to use the power meter, e.g.
- mon = monsoon.Monsoon()
- mon.SetVoltage(3.7)
- mon.StartDataCollection()
- mydata = []
- while len(mydata) < 1000:
- mydata.extend(mon.CollectData())
- mon.StopDataCollection()
-
- See http://wiki/Main/MonsoonProtocol for information on the protocol.
- """
-
- def __init__(self, device=None, serialno=None, wait=1):
- """Establish a connection to a Monsoon.
-
- By default, opens the first available port, waiting if none are ready.
- A particular port can be specified with "device", or a particular
- Monsoon can be specified with "serialno" (using the number printed on
- its back). With wait=0, IOError is thrown if a device is not
- immediately available.
- """
- self._coarse_ref = self._fine_ref = self._coarse_zero = 0
- self._fine_zero = self._coarse_scale = self._fine_scale = 0
- self._last_seq = 0
- self.start_voltage = 0
- self.serial = serialno
-
- if device:
- self.ser = serial.Serial(device, timeout=1)
- return
- # Try all devices connected through USB virtual serial ports until we
- # find one we can use.
- while True:
- for dev in os.listdir("/dev"):
- prefix = "ttyACM"
- # Prefix is different on Mac OS X.
- if sys.platform == "darwin":
- prefix = "tty.usbmodem"
- if not dev.startswith(prefix):
- continue
- tmpname = "/tmp/monsoon.%s.%s" % (os.uname()[0], dev)
- self._tempfile = open(tmpname, "w")
- try:
- os.chmod(tmpname, 0o666)
- except OSError as e:
- pass
-
- try: # use a lockfile to ensure exclusive access
- fcntl.flock(self._tempfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
- except IOError as e:
- logging.error("device %s is in use", dev)
- continue
-
- try: # try to open the device
- self.ser = serial.Serial("/dev/%s" % dev, timeout=1)
- self.StopDataCollection() # just in case
- self._FlushInput() # discard stale input
- status = self.GetStatus()
- except Exception as e:
- logging.exception("Error opening device %s: %s", dev, e)
- continue
-
- if not status:
- logging.error("no response from device %s", dev)
- elif serialno and status["serialNumber"] != serialno:
- logging.error("Another device serial #%d seen on %s",
- status["serialNumber"], dev)
- else:
- self.start_voltage = status["voltage1"]
- return
-
- self._tempfile = None
- if not wait: raise IOError("No device found")
- logging.info("Waiting for device...")
- time.sleep(1)
-
- def GetStatus(self):
- """Requests and waits for status.
-
- Returns:
- status dictionary.
- """
- # status packet format
- STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH"
- STATUS_FIELDS = [
- "packetType",
- "firmwareVersion",
- "protocolVersion",
- "mainFineCurrent",
- "usbFineCurrent",
- "auxFineCurrent",
- "voltage1",
- "mainCoarseCurrent",
- "usbCoarseCurrent",
- "auxCoarseCurrent",
- "voltage2",
- "outputVoltageSetting",
- "temperature",
- "status",
- "leds",
- "mainFineResistor",
- "serialNumber",
- "sampleRate",
- "dacCalLow",
- "dacCalHigh",
- "powerUpCurrentLimit",
- "runTimeCurrentLimit",
- "powerUpTime",
- "usbFineResistor",
- "auxFineResistor",
- "initialUsbVoltage",
- "initialAuxVoltage",
- "hardwareRevision",
- "temperatureLimit",
- "usbPassthroughMode",
- "mainCoarseResistor",
- "usbCoarseResistor",
- "auxCoarseResistor",
- "defMainFineResistor",
- "defUsbFineResistor",
- "defAuxFineResistor",
- "defMainCoarseResistor",
- "defUsbCoarseResistor",
- "defAuxCoarseResistor",
- "eventCode",
- "eventData",
- ]
-
- self._SendStruct("BBB", 0x01, 0x00, 0x00)
- while 1: # Keep reading, discarding non-status packets
- read_bytes = self._ReadPacket()
- if not read_bytes:
- raise MonsoonError("Failed to read Monsoon status")
- calsize = struct.calcsize(STATUS_FORMAT)
- if len(read_bytes) != calsize or read_bytes[0] != 0x10:
- raise MonsoonError(
- "Wanted status, dropped type=0x%02x, len=%d",
- read_bytes[0], len(read_bytes))
- status = dict(
- zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, read_bytes)))
- p_type = status["packetType"]
- if p_type != 0x10:
- raise MonsoonError("Package type %s is not 0x10." % p_type)
- for k in status.keys():
- if k.endswith("VoltageSetting"):
- status[k] = 2.0 + status[k] * 0.01
- elif k.endswith("FineCurrent"):
- pass # needs calibration data
- elif k.endswith("CoarseCurrent"):
- pass # needs calibration data
- elif k.startswith("voltage") or k.endswith("Voltage"):
- status[k] = status[k] * 0.000125
- elif k.endswith("Resistor"):
- status[k] = 0.05 + status[k] * 0.0001
- if k.startswith("aux") or k.startswith("defAux"):
- status[k] += 0.05
- elif k.endswith("CurrentLimit"):
- status[k] = 8 * (1023 - status[k]) / 1023.0
- return status
-
- def RampVoltage(self, start, end):
- v = start
- if v < 3.0: v = 3.0 # protocol doesn't support lower than this
- while (v < end):
- self.SetVoltage(v)
- v += .1
- time.sleep(.1)
- self.SetVoltage(end)
-
- def SetVoltage(self, v):
- """Set the output voltage, 0 to disable.
- """
- if v == 0:
- self._SendStruct("BBB", 0x01, 0x01, 0x00)
- else:
- self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100))
-
- def GetVoltage(self):
- """Get the output voltage.
-
- Returns:
- Current Output Voltage (in unit of v).
- """
- try:
- return self.GetStatus()["outputVoltageSetting"]
- # Catch potential errors such as struct.error, TypeError and other
- # unknown errors which would bring down the whole test
- except Exception as e:
- raise MonsoonError("Error getting Monsoon voltage")
-
- def SetMaxCurrent(self, i):
- """Set the max output current.
- """
- if i < 0 or i > 8:
- raise MonsoonError(("Target max current %sA, is out of acceptable "
- "range [0, 8].") % i)
- val = 1023 - int((i / 8) * 1023)
- self._SendStruct("BBB", 0x01, 0x0a, val & 0xff)
- self._SendStruct("BBB", 0x01, 0x0b, val >> 8)
-
- def SetMaxPowerUpCurrent(self, i):
- """Set the max power up current.
- """
- if i < 0 or i > 8:
- raise MonsoonError(("Target max current %sA, is out of acceptable "
- "range [0, 8].") % i)
- val = 1023 - int((i / 8) * 1023)
- self._SendStruct("BBB", 0x01, 0x08, val & 0xff)
- self._SendStruct("BBB", 0x01, 0x09, val >> 8)
-
- def SetUsbPassthrough(self, val):
- """Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto.
- """
- self._SendStruct("BBB", 0x01, 0x10, val)
-
- def GetUsbPassthrough(self):
- """Get the USB passthrough mode: 0 = off, 1 = on, 2 = auto.
-
- Returns:
- Current USB passthrough mode.
- """
- try:
- return self.GetStatus()["usbPassthroughMode"]
- # Catch potential errors such as struct.error, TypeError and other
- # unknown errors which would bring down the whole test
- except Exception as e:
- raise MonsoonError("Error reading Monsoon USB passthrough status")
-
- def StartDataCollection(self):
- """Tell the device to start collecting and sending measurement data.
- """
- self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command
- self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8)
-
- def StopDataCollection(self):
- """Tell the device to stop collecting measurement data.
- """
- self._SendStruct("BB", 0x03, 0x00) # stop
-
- def CollectData(self):
- """Return some current samples. Call StartDataCollection() first.
- """
- while 1: # loop until we get data or a timeout
- _bytes = self._ReadPacket()
- if not _bytes:
- raise MonsoonError("Data collection failed due to empty data")
- if len(_bytes) < 4 + 8 + 1 or _bytes[0] < 0x20 or _bytes[0] > 0x2F:
- logging.warning("Wanted data, dropped type=0x%02x, len=%d",
- _bytes[0], len(_bytes))
- continue
-
- seq, _type, x, y = struct.unpack("BBBB", _bytes[:4])
- data = [
- struct.unpack(">hhhh", _bytes[x:x + 8])
- for x in range(4,
- len(_bytes) - 8, 8)
- ]
-
- if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF:
- logging.warning("Data sequence skipped, lost packet?")
- self._last_seq = seq
-
- if _type == 0:
- if not self._coarse_scale or not self._fine_scale:
- logging.warning(
- "Waiting for calibration, dropped data packet.")
- continue
- out = []
- for main, usb, aux, voltage in data:
- if main & 1:
- coarse = ((main & ~1) - self._coarse_zero)
- out.append(coarse * self._coarse_scale)
- else:
- out.append((main - self._fine_zero) * self._fine_scale)
- return out
- elif _type == 1:
- self._fine_zero = data[0][0]
- self._coarse_zero = data[1][0]
- elif _type == 2:
- self._fine_ref = data[0][0]
- self._coarse_ref = data[1][0]
- else:
- logging.warning("Discarding data packet type=0x%02x", _type)
- continue
-
- # See http://wiki/Main/MonsoonProtocol for details on these values.
- if self._coarse_ref != self._coarse_zero:
- self._coarse_scale = 2.88 / (
- self._coarse_ref - self._coarse_zero)
- if self._fine_ref != self._fine_zero:
- self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero)
-
- def _SendStruct(self, fmt, *args):
- """Pack a struct (without length or checksum) and send it.
- """
- # Flush out the input buffer before sending data
- self._FlushInput()
- data = struct.pack(fmt, *args)
- data_len = len(data) + 1
- checksum = (data_len + sum(bytearray(data))) % 256
- out = struct.pack("B", data_len) + data + struct.pack("B", checksum)
- self.ser.write(out)
-
- def _ReadPacket(self):
- """Read a single data record as a string (without length or checksum).
- """
- len_char = self.ser.read(1)
- if not len_char:
- raise MonsoonError("Reading from serial port timed out")
-
- data_len = ord(len_char)
- if not data_len:
- return ""
- result = self.ser.read(int(data_len))
- result = bytearray(result)
- if len(result) != data_len:
- raise MonsoonError(
- "Length mismatch, expected %d bytes, got %d bytes.", data_len,
- len(result))
- body = result[:-1]
- checksum = (sum(struct.unpack("B" * len(body), body)) + data_len) % 256
- if result[-1] != checksum:
- raise MonsoonError(
- "Invalid checksum from serial port! Expected %s, got %s",
- hex(checksum), hex(result[-1]))
- return result[:-1]
-
- def _FlushInput(self):
- """ Flush all read data until no more available. """
- self.ser.reset_input_buffer()
- flushed = 0
- while True:
- ready_r, ready_w, ready_x = select.select([self.ser], [],
- [self.ser], 0)
- if len(ready_x) > 0:
- raise MonsoonError("Exception from serial port.")
- elif len(ready_r) > 0:
- flushed += 1
- self.ser.read(1) # This may cause underlying buffering.
- self.ser.reset_input_buffer(
- ) # Flush the underlying buffer too.
- else:
- break
- # if flushed > 0:
- # logging.info("dropped >%d bytes" % flushed)
-
-
-class MonsoonData(object):
- """A class for reporting power measurement data from monsoon.
-
- Data means the measured current value in Amps.
- """
- # Number of digits for long rounding.
- lr = 8
- # Number of digits for short rounding
- sr = 6
- # Delimiter for writing multiple MonsoonData objects to text file.
- delimiter = "\n\n==========\n\n"
-
- def __init__(self, data_points, timestamps, hz, voltage, offset=0):
- """Instantiates a MonsoonData object.
-
- Args:
- data_points: A list of current values in Amp (float).
- timestamps: A list of epoch timestamps (int).
- hz: The hertz at which the data points are measured.
- voltage: The voltage at which the data points are measured.
- offset: The number of initial data points to discard
- in calculations.
- """
- self._data_points = data_points
- self._timestamps = timestamps
- self.offset = offset
- num_of_data_pt = len(self._data_points)
- if self.offset >= num_of_data_pt:
- raise MonsoonError(
- ("Offset number (%d) must be smaller than the "
- "number of data points (%d).") % (offset, num_of_data_pt))
- self.data_points = self._data_points[self.offset:]
- self.timestamps = self._timestamps[self.offset:]
- self.hz = hz
- self.voltage = voltage
- self.tag = None
- self._validate_data()
-
- @property
- def average_current(self):
- """Average current in the unit of mA.
- """
- len_data_pt = len(self.data_points)
- if len_data_pt == 0:
- return 0
- cur = sum(self.data_points) * 1000 / len_data_pt
- return round(cur, self.sr)
-
- @property
- def total_charge(self):
- """Total charged used in the unit of mAh.
- """
- charge = (sum(self.data_points) / self.hz) * 1000 / 3600
- return round(charge, self.sr)
-
- @property
- def total_power(self):
- """Total power used.
- """
- power = self.average_current * self.voltage
- return round(power, self.sr)
-
- @staticmethod
- def from_string(data_str):
- """Creates a MonsoonData object from a string representation generated
- by __str__.
-
- Args:
- str: The string representation of a MonsoonData.
-
- Returns:
- A MonsoonData object.
- """
- lines = data_str.strip().split('\n')
- err_msg = ("Invalid input string format. Is this string generated by "
- "MonsoonData class?")
- conditions = [
- len(lines) <= 4, "Average Current:" not in lines[1],
- "Voltage: " not in lines[2], "Total Power: " not in lines[3],
- "samples taken at " not in lines[4],
- lines[5] != "Time" + ' ' * 7 + "Amp"
- ]
- if any(conditions):
- raise MonsoonError(err_msg)
- """Example string from Monsoon output file, first line is empty.
- Line1:
- Line2: test_2g_screenoff_dtimx2_marlin_OPD1.170706.006
- Line3: Average Current: 51.87984mA.
- Line4: Voltage: 4.2V.
- Line5: Total Power: 217.895328mW.
- Line6: 150000 samples taken at 500Hz, with an offset of 0 samples.
- """
- hz_str = lines[4].split()[4]
- hz = int(hz_str[:-3])
- voltage_str = lines[2].split()[1]
- voltage = float(voltage_str[:-2])
- lines = lines[6:]
- t = []
- v = []
- for l in lines:
- try:
- timestamp, value = l.split(' ')
- t.append(int(timestamp))
- v.append(float(value))
- except ValueError:
- raise MonsoonError(err_msg)
- return MonsoonData(v, t, hz, voltage)
-
- @staticmethod
- def save_to_text_file(monsoon_data, file_path):
- """Save multiple MonsoonData objects to a text file.
-
- Args:
- monsoon_data: A list of MonsoonData objects to write to a text
- file.
- file_path: The full path of the file to save to, including the file
- name.
- """
- if not monsoon_data:
- raise MonsoonError("Attempting to write empty Monsoon data to "
- "file, abort")
- utils.create_dir(os.path.dirname(file_path))
- with open(file_path, 'a') as f:
- for md in monsoon_data:
- f.write(str(md))
- f.write(MonsoonData.delimiter)
-
- @staticmethod
- def from_text_file(file_path):
- """Load MonsoonData objects from a text file generated by
- MonsoonData.save_to_text_file.
-
- Args:
- file_path: The full path of the file load from, including the file
- name.
-
- Returns:
- A list of MonsoonData objects.
- """
- results = []
- with open(file_path, 'r') as f:
- data_strs = f.read().split(MonsoonData.delimiter)
- data_strs = data_strs[:-1]
- for data_str in data_strs:
- results.append(MonsoonData.from_string(data_str))
- return results
-
- def _validate_data(self):
- """Verifies that the data points contained in the class are valid.
- """
- msg = "Error! Expected {} timestamps, found {}.".format(
- len(self._data_points), len(self._timestamps))
- if len(self._data_points) != len(self._timestamps):
- raise MonsoonError(msg)
-
- def update_offset(self, new_offset):
- """Updates how many data points to skip in caculations.
-
- Always use this function to update offset instead of directly setting
- self.offset.
-
- Args:
- new_offset: The new offset.
- """
- self.offset = new_offset
- self.data_points = self._data_points[self.offset:]
- self.timestamps = self._timestamps[self.offset:]
-
- def get_data_with_timestamps(self):
- """Returns the data points with timestamps.
-
- Returns:
- A list of tuples in the format of (timestamp, data)
- """
- result = []
- for t, d in zip(self.timestamps, self.data_points):
- result.append(t, round(d, self.lr))
- return result
-
- def get_average_record(self, n):
- """Returns a list of average current numbers, each representing the
- average over the last n data points.
-
- Args:
- n: Number of data points to average over.
-
- Returns:
- A list of average current values.
- """
- history_deque = collections.deque()
- averages = []
- for d in self.data_points:
- history_deque.appendleft(d)
- if len(history_deque) > n:
- history_deque.pop()
- avg = sum(history_deque) / len(history_deque)
- averages.append(round(avg, self.lr))
- return averages
-
- def _header(self):
- strs = [""]
- if self.tag:
- strs.append(self.tag)
+ for serial in configs:
+ serial_number = int(serial)
+ if serial_number < 20000:
+ # This code assumes the LVPM has not been updated to have a
+ # non-stock firmware. If someone has updated the firmware,
+ # power measurement will fail.
+ objs.append(LvpmStockMonsoon(serial=serial_number))
else:
- strs.append("Monsoon Measurement Data")
- strs.append("Average Current: {}mA.".format(self.average_current))
- strs.append("Voltage: {}V.".format(self.voltage))
- strs.append("Total Power: {}mW.".format(self.total_power))
- strs.append(
- ("{} samples taken at {}Hz, with an offset of {} samples.").format(
- len(self._data_points), self.hz, self.offset))
- return "\n".join(strs)
-
- def __len__(self):
- return len(self.data_points)
-
- def __str__(self):
- strs = []
- strs.append(self._header())
- strs.append("Time" + ' ' * 7 + "Amp")
- for t, d in zip(self.timestamps, self.data_points):
- strs.append("{} {}".format(t, round(d, self.sr)))
- return "\n".join(strs)
-
- def __repr__(self):
- return self._header()
-
-
-class Monsoon(object):
- """The wrapper class for test scripts to interact with monsoon.
- """
-
- def __init__(self, *args, **kwargs):
- serial = kwargs["serial"]
- device = None
- self.log = logging.getLogger()
- if "device" in kwargs:
- device = kwargs["device"]
- self.mon = MonsoonProxy(serialno=serial, device=device)
- self.dev = self.mon.ser.name
- self.serial = serial
- self.dut = None
-
- def attach_device(self, dut):
- """Attach the controller object for the Device Under Test (DUT)
- physically attached to the Monsoon box.
-
- Args:
- dut: A controller object representing the device being powered by
- this Monsoon box.
- """
- self.dut = dut
-
- def set_voltage(self, volt, ramp=False):
- """Sets the output voltage of monsoon.
-
- Args:
- volt: Voltage to set the output to.
- ramp: If true, the output voltage will be increased gradually to
- prevent tripping Monsoon overvoltage.
- """
- if ramp:
- self.mon.RampVoltage(mon.start_voltage, volt)
- else:
- self.mon.SetVoltage(volt)
-
- def set_max_current(self, cur):
- """Sets monsoon's max output current.
-
- Args:
- cur: The max current in A.
- """
- self.mon.SetMaxCurrent(cur)
-
- def set_max_init_current(self, cur):
- """Sets the max power-up/inital current.
-
- Args:
- cur: The max initial current allowed in mA.
- """
- self.mon.SetMaxPowerUpCurrent(cur)
-
- @property
- def status(self):
- """Gets the status params of monsoon.
-
- Returns:
- A dictionary where each key-value pair represents a monsoon status
- param.
- """
- return self.mon.GetStatus()
-
- def take_samples(self, sample_hz, sample_num, sample_offset=0, live=False):
- """Take samples of the current value supplied by monsoon.
-
- This is the actual measurement for power consumption. This function
- blocks until the number of samples requested has been fulfilled.
-
- Args:
- hz: Number of points to take for every second.
- sample_num: Number of samples to take.
- offset: The number of initial data points to discard in MonsoonData
- calculations. sample_num is extended by offset to compensate.
- live: Print each sample in console as measurement goes on.
-
- Returns:
- A MonsoonData object representing the data obtained in this
- sampling. None if sampling is unsuccessful.
- """
- sys.stdout.flush()
- voltage = self.mon.GetVoltage()
- self.log.info("Taking samples at %dhz for %ds, voltage %.2fv.",
- sample_hz, (sample_num / sample_hz), voltage)
- sample_num += sample_offset
- # Make sure state is normal
- self.mon.StopDataCollection()
- status = self.mon.GetStatus()
- native_hz = status["sampleRate"] * 1000
-
- # Collect and average samples as specified
- self.mon.StartDataCollection()
-
- # In case sample_hz doesn't divide native_hz exactly, use this
- # invariant: 'offset' = (consumed samples) * sample_hz -
- # (emitted samples) * native_hz
- # This is the error accumulator in a variation of Bresenham's
- # algorithm.
- emitted = offset = 0
- collected = []
- # past n samples for rolling average
- history_deque = collections.deque()
- current_values = []
- timestamps = []
-
- try:
- last_flush = time.time()
- while emitted < sample_num or sample_num == -1:
- # The number of raw samples to consume before emitting the next
- # output
- need = int((native_hz - offset + sample_hz - 1) / sample_hz)
- if need > len(collected): # still need more input samples
- samples = self.mon.CollectData()
- if not samples:
- break
- collected.extend(samples)
- else:
- # Have enough data, generate output samples.
- # Adjust for consuming 'need' input samples.
- offset += need * sample_hz
- # maybe multiple, if sample_hz > native_hz
- while offset >= native_hz:
- # TODO(angli): Optimize "collected" operations.
- this_sample = sum(collected[:need]) / need
- this_time = int(time.time())
- timestamps.append(this_time)
- if live:
- self.log.info("%s %s", this_time, this_sample)
- current_values.append(this_sample)
- sys.stdout.flush()
- offset -= native_hz
- emitted += 1 # adjust for emitting 1 output sample
- collected = collected[need:]
- now = time.time()
- if now - last_flush >= 0.99: # flush every second
- sys.stdout.flush()
- last_flush = now
- except Exception as e:
- pass
- self.mon.StopDataCollection()
- try:
- return MonsoonData(
- current_values,
- timestamps,
- sample_hz,
- voltage,
- offset=sample_offset)
- except:
- return None
-
- @utils.timeout(60)
- def usb(self, state):
- """Sets the monsoon's USB passthrough mode. This is specific to the
- USB port in front of the monsoon box which connects to the powered
- device, NOT the USB that is used to talk to the monsoon itself.
-
- "Off" means USB always off.
- "On" means USB always on.
- "Auto" means USB is automatically turned off when sampling is going on,
- and turned back on when sampling finishes.
-
- Args:
- stats: The state to set the USB passthrough to.
-
- Returns:
- True if the state is legal and set. False otherwise.
- """
- state_lookup = {"off": 0, "on": 1, "auto": 2}
- state = state.lower()
- if state in state_lookup:
- current_state = self.mon.GetUsbPassthrough()
- while (current_state != state_lookup[state]):
- self.mon.SetUsbPassthrough(state_lookup[state])
- time.sleep(1)
- current_state = self.mon.GetUsbPassthrough()
- return True
- return False
-
- def _check_dut(self):
- """Verifies there is a DUT attached to the monsoon.
-
- This should be called in the functions that operate the DUT.
- """
- if not self.dut:
- raise MonsoonError("Need to attach the device before using it.")
-
- @utils.timeout(15)
- def _wait_for_device(self, ad):
- while ad.serial not in android_device.list_adb_devices():
- pass
- ad.adb.wait_for_device()
-
- def execute_sequence_and_measure(self,
- step_funcs,
- hz,
- duration,
- offset_sec=20,
- *args,
- **kwargs):
- """@Deprecated.
- Executes a sequence of steps and take samples in-between.
-
- For each step function, the following steps are followed:
- 1. The function is executed to put the android device in a state.
- 2. If the function returns False, skip to next step function.
- 3. If the function returns True, sl4a session is disconnected.
- 4. Monsoon takes samples.
- 5. Sl4a is reconnected.
-
- Because it takes some time for the device to calm down after the usb
- connection is cut, an offset is set for each measurement. The default
- is 20s.
-
- Args:
- hz: Number of samples to take per second.
- durations: Number(s) of minutes to take samples for in each step.
- If this is an integer, all the steps will sample for the same
- amount of time. If this is an iterable of the same length as
- step_funcs, then each number represents the number of minutes
- to take samples for after each step function.
- e.g. If durations[0] is 10, we'll sample for 10 minutes after
- step_funcs[0] is executed.
- step_funcs: A list of funtions, whose first param is an android
- device object. If a step function returns True, samples are
- taken after this step, otherwise we move on to the next step
- function.
- ad: The android device object connected to this monsoon.
- offset_sec: The number of seconds of initial data to discard.
- *args, **kwargs: Extra args to be passed into each step functions.
-
- Returns:
- The MonsoonData objects from samplings.
- """
- self._check_dut()
- sample_nums = []
- try:
- if len(duration) != len(step_funcs):
- raise MonsoonError(("The number of durations need to be the "
- "same as the number of step functions."))
- for d in duration:
- sample_nums.append(d * 60 * hz)
- except TypeError:
- num = duration * 60 * hz
- sample_nums = [num] * len(step_funcs)
- results = []
- oset = offset_sec * hz
- for func, num in zip(step_funcs, sample_nums):
- try:
- self.usb("auto")
- step_name = func.__name__
- self.log.info("Executing step function %s.", step_name)
- take_sample = func(ad, *args, **kwargs)
- if not take_sample:
- self.log.info("Skip taking samples for %s", step_name)
- continue
- time.sleep(1)
- self.dut.stop_services()
- time.sleep(1)
- self.log.info("Taking samples for %s.", step_name)
- data = self.take_samples(hz, num, sample_offset=oset)
- if not data:
- raise MonsoonError("Sampling for %s failed." % step_name)
- self.log.info("Sample summary: %s", repr(data))
- data.tag = step_name
- results.append(data)
- except Exception:
- self.log.exception("Exception happened during step %s, abort!"
- % func.__name__)
- return results
- finally:
- self.mon.StopDataCollection()
- self.usb("on")
- self._wait_for_device(self.dut)
- # Wait for device to come back online.
- time.sleep(10)
- self.dut.start_services()
- # Release wake lock to put device into sleep.
- self.dut.droid.goToSleepNow()
- return results
-
- def disconnect_dut(self):
- """Disconnect DUT from monsoon.
-
- Stop the sl4a service on the DUT and disconnect USB connection
- raises:
- MonsoonError: monsoon erro trying to disconnect usb
- """
- try:
- self.dut.stop_services()
- time.sleep(1)
- self.usb("off")
- except Exception as e:
- raise MonsoonError(
- "Error happended trying to disconnect DUT from Monsoon")
-
- def monsoon_usb_auto(self):
- """Set monsoon USB to auto to ready the device for power measurement.
-
- Stop the sl4a service on the DUT and disconnect USB connection
- raises:
- MonsoonError: monsoon erro trying to set usbpassthrough to auto
- """
- try:
- self.dut.stop_services()
- time.sleep(1)
- self.usb("auto")
- except Exception as e:
- raise MonsoonError(
- "Error happended trying to set Monsoon usbpassthrough to auto")
-
- def reconnect_dut(self):
- """Reconnect DUT to monsoon and start sl4a services.
-
- raises:
- MonsoonError: monsoon erro trying to reconnect usb
- Turn usbpassthrough on and start the sl4a services.
- """
- self.log.info("Reconnecting dut.")
- try:
- # If wait for device failed, reset monsoon and try it again, if
- # this still fails, then raise
- try:
- self._wait_for_device(self.dut)
- except acts.utils.TimeoutError:
- self.log.info('Retry-reset monsoon and connect again')
- self.usb('off')
- time.sleep(1)
- self.usb('on')
- self._wait_for_device(self.dut)
- # Wait for device to come back online.
- time.sleep(2)
- self.dut.start_services()
- # Release wake lock to put device into sleep.
- self.dut.droid.goToSleepNow()
- self.log.info("Dut reconnected.")
- except Exception as e:
- raise MonsoonError("Error happened trying to reconnect DUT")
-
- def measure_power(self, hz, duration, tag, offset=30):
- """Measure power consumption of the attached device.
-
- Because it takes some time for the device to calm down after the usb
- connection is cut, an offset is set for each measurement. The default
- is 30s. The total time taken to measure will be (duration + offset).
-
- Args:
- hz: Number of samples to take per second.
- duration: Number of seconds to take samples for in each step.
- offset: The number of seconds of initial data to discard.
- tag: A string that's the name of the collected data group.
-
- Returns:
- A MonsoonData object with the measured power data.
- """
- num = duration * hz
- oset = offset * hz
- data = None
- try:
- data = self.take_samples(hz, num, sample_offset=oset)
- if not data:
- raise MonsoonError(
- ("No data was collected in measurement %s.") % tag)
- data.tag = tag
- self.log.info("Measurement summary: %s", repr(data))
- finally:
- self.mon.StopDataCollection()
- return data
+ objs.append(HvpmMonsoon(serial=serial_number))
+ return objs
- def reconnect_monsoon(self):
- """Reconnect Monsoon to serial port.
- """
- logging.info("Close serial connection")
- self.mon.ser.close()
- logging.info("Reset serial port")
- time.sleep(5)
- logging.info("Open serial connection")
- self.mon.ser.open()
- self.mon.ser.reset_input_buffer()
- self.mon.ser.reset_output_buffer()
+def destroy(monsoons):
+ for monsoon in monsoons:
+ monsoon.release_monsoon_connection()
diff --git a/acts/framework/acts/controllers/monsoon_lib/api/common.py b/acts/framework/acts/controllers/monsoon_lib/api/common.py
index fffed4cbd3..f932535467 100644
--- a/acts/framework/acts/controllers/monsoon_lib/api/common.py
+++ b/acts/framework/acts/controllers/monsoon_lib/api/common.py
@@ -40,7 +40,37 @@ PASSTHROUGH_STATES = {
}
-class MonsoonData(object):
+class MonsoonDataRecord(object):
+ """A data class for Monsoon data points."""
+ def __init__(self, time, current):
+ """Creates a new MonsoonDataRecord.
+
+ Args:
+ time: the string '{time}s', where time is measured in seconds since
+ the beginning of the data collection.
+ current: The current in Amperes as a string.
+ """
+ self._time = float(time[:-1])
+ self._current = float(current)
+
+ @property
+ def time(self):
+ """The time the record was fetched."""
+ return self._time
+
+ @property
+ def current(self):
+ """The amount of current in Amperes measured for the given record."""
+ return self._current
+
+ @classmethod
+ def create_from_record_line(cls, line):
+ """Creates a data record from the line passed in from the output file.
+ """
+ return cls(*line.split(' '))
+
+
+class MonsoonResult(object):
"""An object that contains aggregated data collected during sampling.
Attributes:
@@ -53,19 +83,48 @@ class MonsoonData(object):
# The number of decimal places to round a value to.
ROUND_TO = 6
- def __init__(self, num_samples, sum_currents, hz, voltage, tag=None):
+ def __init__(self, num_samples, sum_currents, hz, voltage, datafile_path):
+ """Creates a new MonsoonResult.
+
+ Args:
+ num_samples: the number of samples collected.
+ sum_currents: the total summation of every current measurement.
+ hz: the number of samples per second.
+ voltage: the voltage used during the test.
+ datafile_path: the path to the monsoon data file.
+ """
self._num_samples = num_samples
self._sum_currents = sum_currents
self._hz = hz
self._voltage = voltage
- self.tag = tag
+ self.tag = datafile_path
+
+ def get_data_points(self):
+ """Returns an iterator of MonsoonDataRecords."""
+ class MonsoonDataIterator:
+ def __init__(self, file):
+ self.file = file
+
+ def __iter__(self):
+ with open(self.file, 'r') as f:
+ for line in f:
+ # Remove the newline character.
+ line.strip()
+ yield MonsoonDataRecord.create_from_record_line(line)
+
+ return MonsoonDataIterator(self.tag)
+
+ @property
+ def num_samples(self):
+ """The number of samples recorded during the test."""
+ return self._num_samples
@property
def average_current(self):
"""Average current in mA."""
- if self._num_samples == 0:
+ if self.num_samples == 0:
return 0
- return round(self._sum_currents * 1000 / self._num_samples,
+ return round(self._sum_currents * 1000 / self.num_samples,
self.ROUND_TO)
@property
@@ -79,8 +138,14 @@ class MonsoonData(object):
"""Total power used."""
return round(self.average_current * self._voltage, self.ROUND_TO)
+ @property
+ def voltage(self):
+ """The voltage during the measurement (in Volts)."""
+ return self._voltage
+
def __str__(self):
return ('avg current: %s\n'
'total charge: %s\n'
- 'total power: %s' % (self.average_current, self.total_charge,
- self.total_power))
+ 'total power: %s\n'
+ 'total samples: %s' % (self.average_current, self.total_charge,
+ self.total_power, self._num_samples))
diff --git a/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py
index 6dc72c8f91..f1b03c9114 100644
--- a/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py
+++ b/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py
@@ -20,7 +20,7 @@ import time
from Monsoon import HVPM
from Monsoon import Operations as op
-from acts.controllers.monsoon_lib.api.common import MonsoonData
+from acts.controllers.monsoon_lib.api.common import MonsoonResult
from acts.controllers.monsoon_lib.api.monsoon import BaseMonsoon
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import AssemblyLineBuilder
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import ThreadAssemblyLine
@@ -137,8 +137,9 @@ class Monsoon(BaseMonsoon):
manager.shutdown()
self._mon.setup_usb(self.serial)
- monsoon_data = MonsoonData(aggregator.num_samples,
- aggregator.sum_currents, hz, voltage)
+ monsoon_data = MonsoonResult(aggregator.num_samples,
+ aggregator.sum_currents, hz, voltage,
+ output_path)
self._log.info('Measurement summary:\n%s', str(monsoon_data))
return monsoon_data
diff --git a/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py
index b54b6794fc..e8d116d83f 100644
--- a/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py
+++ b/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py
@@ -16,7 +16,7 @@
import multiprocessing
import time
-from acts.controllers.monsoon_lib.api.common import MonsoonData
+from acts.controllers.monsoon_lib.api.common import MonsoonResult
from acts.controllers.monsoon_lib.api.lvpm_stock.monsoon_proxy import MonsoonProxy
from acts.controllers.monsoon_lib.api.monsoon import BaseMonsoon
from acts.controllers.monsoon_lib.sampling.engine.assembly_line import AssemblyLineBuilder
@@ -119,8 +119,9 @@ class Monsoon(BaseMonsoon):
manager.shutdown()
- monsoon_data = MonsoonData(aggregator.num_samples,
- aggregator.sum_currents, hz, voltage)
+ monsoon_data = MonsoonResult(aggregator.num_samples,
+ aggregator.sum_currents, hz, voltage,
+ output_path)
self._log.info('Measurement summary:\n%s', str(monsoon_data))
return monsoon_data
diff --git a/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py
index 4916da2f15..a55fc8a0e0 100644
--- a/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py
+++ b/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py
@@ -160,6 +160,8 @@ class BaseMonsoon(object):
"""Deprecated. Use the connection callbacks instead."""
def on_reconnect():
+ # Make sure the device is connected and available for commands.
+ android_device.wait_for_boot_completion()
android_device.start_services()
# Release wake lock to put device into sleep.
android_device.droid.goToSleepNow()
@@ -180,15 +182,6 @@ class BaseMonsoon(object):
"""Sets the callback to be called when Monsoon reconnects USB."""
self.on_reconnect = callback
- def monsoon_usb_auto(self):
- """Deprecated. Use usb('auto') or usb(PassthroughStates.AUTO) instead.
- """
- self.usb(PassthroughStates.AUTO)
-
- def reconnect_dut(self):
- """Deprecated. Use usb('on') or usb(PassthroughStates.ON) instead."""
- self.usb(PassthroughStates.ON)
-
def take_samples(self, assembly_line):
"""Runs the sampling procedure based on the given assembly line."""
# Sampling is always done in a separate process. Release the Monsoon
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py
index 24c5a6ddbb..0163d16517 100644
--- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py
@@ -73,7 +73,6 @@ class TransmissionModes(Enum):
TM2 = 'TM2'
TM3 = 'TM3'
TM4 = 'TM4'
- TM6 = 'TM6'
TM7 = 'TM7'
TM8 = 'TM8'
TM9 = 'TM9'
@@ -302,7 +301,7 @@ class Cmw500(abstract_inst.SocketInstrument):
return self.send_and_recv('*IDN?')
def disconnect(self):
- """Detach controller from device and switch to local mode."""
+ """Disconnect controller from device and switch to local mode."""
self.switch_lte_signalling(LteState.LTE_OFF)
self.close_remote_mode()
self._close_socket()
@@ -311,6 +310,10 @@ class Cmw500(abstract_inst.SocketInstrument):
"""Exits remote mode to local mode."""
self.send_and_recv('&GTL')
+ def detach(self):
+ """Detach callbox and controller."""
+ self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DETach')
+
@property
def rrc_connection(self):
"""Gets the RRC connection state."""
@@ -747,6 +750,8 @@ class BaseStation(object):
Args:
num_antenna: Count of number of dl antennas to use.
"""
+ if not isinstance(num_antenna, MimoModes):
+ raise ValueError('num_antenna should be an instance of MimoModes.')
cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas {}'.format(
self._bts, num_antenna)
self._cmw.send_and_recv(cmd)
diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py
index 65e66f9f84..64d127b808 100644
--- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py
+++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py
@@ -17,6 +17,27 @@ import time
from acts.controllers.rohdeschwarz_lib import cmw500
from acts.controllers import cellular_simulator as cc
+from acts.test_utils.power.tel_simulations import LteSimulation
+
+CMW_TM_MAPPING = {
+ LteSimulation.TransmissionMode.TM1: cmw500.TransmissionModes.TM1,
+ LteSimulation.TransmissionMode.TM2: cmw500.TransmissionModes.TM2,
+ LteSimulation.TransmissionMode.TM3: cmw500.TransmissionModes.TM3,
+ LteSimulation.TransmissionMode.TM4: cmw500.TransmissionModes.TM4,
+ LteSimulation.TransmissionMode.TM7: cmw500.TransmissionModes.TM7,
+ LteSimulation.TransmissionMode.TM8: cmw500.TransmissionModes.TM8,
+ LteSimulation.TransmissionMode.TM9: cmw500.TransmissionModes.TM9
+}
+
+CMW_SCH_MAPPING = {
+ LteSimulation.SchedulingMode.STATIC: cmw500.SchedulingMode.USERDEFINEDCH
+}
+
+CMW_MIMO_MAPPING = {
+ LteSimulation.MimoMode.MIMO_1x1: cmw500.MimoModes.MIMO1x1,
+ LteSimulation.MimoMode.MIMO_2x2: cmw500.MimoModes.MIMO2x2,
+ LteSimulation.MimoMode.MIMO_4x4: cmw500.MimoModes.MIMO4x4
+}
class CMW500CellularSimulator(cc.AbstractCellularSimulator):
@@ -57,7 +78,6 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
""" Sends finalization commands to the cellular equipment and closes
the connection. """
self.cmw.disconnect()
- self.cmw.close_remote_mode()
def setup_lte_scenario(self):
""" Configures the equipment for an LTE simulation. """
@@ -68,6 +88,18 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
""" Configures the equipment for an LTE with CA simulation. """
raise NotImplementedError()
+ def set_lte_rrc_state_change_timer(self, enabled, time=10):
+ """ Configures the LTE RRC state change timer.
+
+ Args:
+ enabled: a boolean indicating if the timer should be on or off.
+ time: time in seconds for the timer to expire
+ """
+ # Setting this method to pass instead of raising an exception as it
+ # it is required by LTE sims.
+ # TODO (b/141838145): Implement RRC status change timer for CMW500.
+ pass
+
def set_band(self, bts_index, band):
""" Sets the band for the indicated base station.
@@ -111,7 +143,8 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
bts_index: the base station number
output_power: the new output power
"""
- raise NotImplementedError()
+ bts = self.bts[bts_index]
+ bts.downlink_power_level = output_power
def set_tdd_config(self, bts_index, tdd_config):
""" Sets the tdd configuration number for the indicated base station.
@@ -166,7 +199,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
mimo_mode: the new mimo mode
"""
bts = self.bts[bts_index]
-
+ mimo_mode = CMW_MIMO_MAPPING[mimo_mode]
if mimo_mode == cmw500.MimoModes.MIMO1x1:
self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN1x1)
bts.dl_antenna = cmw500.MimoModes.MIMO1x1
@@ -190,6 +223,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
"""
bts = self.bts[bts_index]
+ tmode = CMW_TM_MAPPING[tmode]
if (tmode in [
cmw500.TransmissionModes.TM1,
cmw500.TransmissionModes.TM7
@@ -224,7 +258,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
nrb_ul: Number of RBs for uplink.
"""
bts = self.bts[bts_index]
- bts.scheduling_mode = scheduling
+ bts.scheduling_mode = CMW_SCH_MAPPING[scheduling]
if not self.ul_modulation and self.dl_modulation:
raise ValueError('Modulation should be set prior to scheduling '
@@ -305,7 +339,8 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
bts_index: the base station number
tbs_pattern_on: the new TBS pattern setting
"""
- raise NotImplementedError()
+ # TODO (b/143918664): CMW500 doesn't have an equivalent setting.
+ pass
def lte_attach_secondary_carriers(self):
""" Activates the secondary carriers for CA. Requires the DUT to be
@@ -341,7 +376,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator):
def detach(self):
""" Turns off all the base stations so the DUT loose connection."""
- self.cmw.disconnect()
+ self.cmw.detach()
def stop(self):
""" Stops current simulation. After calling this method, the simulator
diff --git a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
index 1f13d26499..bb63bd9934 100644
--- a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
+++ b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py
@@ -14,8 +14,11 @@
# License for the specific language governing permissions and limitations under
# the License.
import inspect
-
+import time
+from acts import asserts
from acts.controllers.buds_lib.dev_utils import apollo_sink_events
+from acts.test_utils.bt.bt_constants import bt_default_timeout
+
def validate_controller(controller, abstract_device_class):
@@ -257,7 +260,18 @@ class AndroidHeadsetBluetoothHandsfreeAbstractDevice(
@property
def mac_address(self):
- return self.ad_controller.droid.bluetoothGetLocalAddress()
+ """Getting device mac with more stability ensurance.
+
+ Sometime, getting mac address is flaky that it returns None. Adding a
+ loop to add more ensurance of getting correct mac address.
+ """
+ device_mac = None
+ start_time = time.time()
+ end_time = start_time + bt_default_timeout
+ while not device_mac and time.time() < end_time:
+ device_mac = self.ad_controller.droid.bluetoothGetLocalAddress()
+ asserts.assert_true(device_mac, 'Can not get the MAC address')
+ return device_mac
def accept_call(self):
return self.ad_controller.droid.telecomAcceptRingingCall(None)
diff --git a/acts/framework/acts/test_utils/bt/bt_power_test_utils.py b/acts/framework/acts/test_utils/bt/bt_power_test_utils.py
index c9c60723e9..628c3375d1 100644
--- a/acts/framework/acts/test_utils/bt/bt_power_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_power_test_utils.py
@@ -14,93 +14,144 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
import time
+import acts.test_utils.bt.BleEnum as bleenum
+import acts.test_utils.instrumentation.instrumentation_command_builder as icb
-from acts.test_utils.wifi import wifi_power_test_utils as wputils
-from acts.test_utils.bt.bt_test_utils import enable_bluetooth
-from acts.test_utils.bt.bt_test_utils import disable_bluetooth
-
-BT_BASE_UUID = '00000000-0000-1000-8000-00805F9B34FB'
-BT_CLASSICAL_DATA = [1, 2, 3]
BLE_LOCATION_SCAN_ENABLE = 'settings put global ble_scan_always_enabled 1'
BLE_LOCATION_SCAN_DISABLE = 'settings put global ble_scan_always_enabled 0'
-START_PMC_CMD = 'am start -n com.android.pmc/com.android.pmc.PMCMainActivity'
-PMC_VERBOSE_CMD = 'setprop log.tag.PMC VERBOSE'
-PMC_BASE_SCAN = 'am broadcast -a com.android.pmc.BLESCAN --es ScanMode '
+SCREEN_WAIT_TIME = 1
-def phone_setup_for_BT(dut, bt_on, ble_on, screen_status):
- """Sets the phone and Bluetooth in the desired state
+class MediaControl(object):
+ """Media control using adb shell for power testing.
- Args:
- dut: object of the android device under test
- bt_on: Enable/Disable BT
- ble_on: Enable/Disable BLE
- screen_status: screen ON or OFF
+ Object to control media play status using adb.
"""
- # Initialize the dut to rock-bottom state
- wputils.dut_rockbottom(dut)
- time.sleep(2)
-
- # Check if we are enabling a background scan
- # TODO: Turn OFF cellular wihtout having to turn ON airplane mode
- if bt_on == 'OFF' and ble_on == 'ON':
- dut.adb.shell(BLE_LOCATION_SCAN_ENABLE)
- dut.droid.connectivityToggleAirplaneMode(False)
- time.sleep(2)
-
- # Turn ON/OFF BT
- if bt_on == 'ON':
- enable_bluetooth(dut.droid, dut.ed)
- dut.log.info('BT is ON')
- else:
- disable_bluetooth(dut.droid)
- dut.droid.bluetoothDisableBLE()
- dut.log.info('BT is OFF')
- time.sleep(2)
-
- # Turn ON/OFF BLE
- if ble_on == 'ON':
- dut.droid.bluetoothEnableBLE()
- dut.log.info('BLE is ON')
- else:
- dut.droid.bluetoothDisableBLE()
- dut.log.info('BLE is OFF')
- time.sleep(2)
-
- # Set the desired screen status
- if screen_status == 'OFF':
- dut.droid.goToSleepNow()
- dut.log.info('Screen is OFF')
- time.sleep(2)
-
-
-def start_pmc_ble_scan(dut,
- scan_mode,
- offset_start,
- scan_time,
- idle_time=None,
- num_reps=1):
- """Starts a generic BLE scan via the PMC app
+
+ def __init__(self, android_device, music_file):
+ """Initialize the media_control class.
+
+ Args:
+ android_dut: android_device object
+ music_file: location of the music file
+ """
+ self.android_device = android_device
+ self.music_file = music_file
+
+ def player_on_foreground(self):
+ """Turn on screen and make sure media play is on foreground
+
+ All media control keycode only works when screen is on and media player
+ is on the foreground. Turn off screen first and turn it on to make sure
+ all operation is based on the same screen status. Otherwise, 'MENU' key
+ would block command to be sent.
+ """
+ self.android_device.droid.goToSleepNow()
+ time.sleep(SCREEN_WAIT_TIME)
+ self.android_device.droid.wakeUpNow()
+ time.sleep(SCREEN_WAIT_TIME)
+ self.android_device.send_keycode('MENU')
+ time.sleep(SCREEN_WAIT_TIME)
+
+ def play(self):
+ """Start playing music.
+
+ """
+ self.player_on_foreground()
+ PLAY = 'am start -a android.intent.action.VIEW -d file://{} -t audio/wav'.format(
+ self.music_file)
+ self.android_device.adb.shell(PLAY)
+
+ def pause(self):
+ """Pause music.
+
+ """
+ self.player_on_foreground()
+ self.android_device.send_keycode('MEDIA_PAUSE')
+
+ def resume(self):
+ """Pause music.
+
+ """
+ self.player_on_foreground()
+ self.android_device.send_keycode('MEDIA_PLAY')
+
+ def stop(self):
+ """Stop music and close media play.
+
+ """
+ self.player_on_foreground()
+ self.android_device.send_keycode('MEDIA_STOP')
+
+
+def start_apk_ble_adv(dut, adv_mode, adv_power_level, adv_duration):
+ """Trigger BLE advertisement from power-test.apk.
Args:
- dut: object of the android device under test
- scan mode: desired BLE scan type
- offset_start: Time delay in seconds before scan starts
- scan_time: active scan time
- idle_time: iddle time (i.e., no scans occuring)
- num_reps: Number of repetions of the ative+idle scan sequence
+ dut: Android device under test, type AndroidDevice obj
+ adv_mode: The BLE advertisement mode.
+ {0: 'LowPower', 1: 'Balanced', 2: 'LowLatency'}
+ adv_power_leve: The BLE advertisement TX power level.
+ {0: 'UltraLowTXPower', 1: 'LowTXPower', 2: 'MediumTXPower,
+ 3: HighTXPower}
+ adv_duration: duration of advertisement in seconds, type int
"""
- scan_dur = scan_time
- if not idle_time:
- idle_time = 0.2 * scan_time
- scan_dur = 0.8 * scan_time
-
- first_part_msg = '%s%s --es StartTime %d --es ScanTime %d' % (
- PMC_BASE_SCAN, scan_mode, offset_start, scan_dur)
- msg = '%s --es NoScanTime %d --es Repetitions %d' % (first_part_msg,
- idle_time, num_reps)
+ adv_duration = str(adv_duration) + 's'
+ builder = icb.InstrumentationTestCommandBuilder.default()
+ builder.add_test_class(
+ "com.google.android.device.power.tests.ble.BleAdvertise")
+ builder.set_manifest_package("com.google.android.device.power")
+ builder.set_runner("androidx.test.runner.AndroidJUnitRunner")
+ builder.add_key_value_param("cool-off-duration", "0s")
+ builder.add_key_value_param("idle-duration", "0s")
+ builder.add_key_value_param(
+ "com.android.test.power.receiver.ADVERTISE_MODE", adv_mode)
+ builder.add_key_value_param("com.android.test.power.receiver.POWER_LEVEL",
+ adv_power_level)
+ builder.add_key_value_param(
+ "com.android.test.power.receiver.ADVERTISING_DURATION", adv_duration)
+
+ adv_command = builder.build() + ' &'
+ logging.info('Start BLE {} at {} for {} seconds'.format(
+ bleenum.AdvertiseSettingsAdvertiseMode(adv_mode).name,
+ bleenum.AdvertiseSettingsAdvertiseTxPower(adv_power_level).name,
+ adv_duration))
+ dut.adb.shell_nb(adv_command)
+
+
+def start_apk_ble_scan(dut, scan_mode, scan_duration):
+ """Build the command to trigger BLE scan from power-test.apk.
- dut.log.info('Sent BLE scan broadcast message: %s', msg)
- dut.adb.shell(msg)
+ Args:
+ dut: Android device under test, type AndroidDevice obj
+ scan_mode: The BLE scan mode.
+ {0: 'LowPower', 1: 'Balanced', 2: 'LowLatency', -1: 'Opportunistic'}
+ scan_duration: duration of scan in seconds, type int
+ Returns:
+ adv_command: the command for BLE scan
+ """
+ scan_duration = str(scan_duration) + 's'
+ builder = icb.InstrumentationTestCommandBuilder.default()
+ builder.add_test_class("com.google.android.device.power.tests.ble.BleScan")
+ builder.set_manifest_package("com.google.android.device.power")
+ builder.set_runner("androidx.test.runner.AndroidJUnitRunner")
+ builder.add_key_value_param("cool-off-duration", "0s")
+ builder.add_key_value_param("idle-duration", "0s")
+ builder.add_key_value_param("com.android.test.power.receiver.SCAN_MODE",
+ scan_mode)
+ builder.add_key_value_param("com.android.test.power.receiver.MATCH_MODE",
+ 2)
+ builder.add_key_value_param(
+ "com.android.test.power.receiver.SCAN_DURATION", scan_duration)
+ builder.add_key_value_param(
+ "com.android.test.power.receiver.CALLBACK_TYPE", 1)
+ builder.add_key_value_param("com.android.test.power.receiver.FILTER",
+ 'true')
+
+ scan_command = builder.build() + ' &'
+ logging.info('Start BLE {} scans for {} seconds'.format(
+ bleenum.ScanSettingsScanMode(scan_mode).name, scan_duration))
+ dut.adb.shell_nb(scan_command)
diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py
index f13002e589..da4a88089f 100644
--- a/acts/framework/acts/test_utils/bt/bt_test_utils.py
+++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py
@@ -79,7 +79,8 @@ def _add_android_device_to_dictionary(android_device, profile_list,
profile_list: The list of profiles the Android device supports.
"""
for profile in profile_list:
- if profile in selector_dict and android_device not in selector_dict[profile]:
+ if profile in selector_dict and android_device not in selector_dict[
+ profile]:
selector_dict[profile].append(android_device)
else:
selector_dict[profile] = [android_device]
@@ -164,7 +165,7 @@ def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list,
except Exception as err:
adv_android_device.log.debug(
"Failed to stop LE advertisement... reseting Bluetooth. Error {}".
- format(err))
+ format(err))
reset_bluetooth([adv_android_device])
@@ -197,7 +198,9 @@ def clear_bonded_devices(ad):
return True
-def connect_phone_to_headset(android, headset, timeout=bt_default_timeout,
+def connect_phone_to_headset(android,
+ headset,
+ timeout=bt_default_timeout,
connection_check_period=10):
"""Connects android phone to bluetooth headset.
Headset object must have methods power_on and enter_pairing_mode,
@@ -214,7 +217,8 @@ def connect_phone_to_headset(android, headset, timeout=bt_default_timeout,
connected (bool): True if devices are paired and connected by end of
method. False otherwise.
"""
- connected = is_a2dp_src_device_connected(android, headset.mac_address)
+ headset_mac_address = headset.mac_address
+ connected = is_a2dp_src_device_connected(android, headset_mac_address)
log.info('Devices connected before pair attempt: %s' % connected)
if not connected:
# Turn on headset and initiate pairing mode.
@@ -224,16 +228,18 @@ def connect_phone_to_headset(android, headset, timeout=bt_default_timeout,
# If already connected, skip pair and connect attempt.
while not connected and (time.time() - start_time < timeout):
bonded_info = android.droid.bluetoothGetBondedDevices()
- if headset.mac_address not in [info["address"] for info in bonded_info]:
+ if headset.mac_address not in [
+ info["address"] for info in bonded_info
+ ]:
# Use SL4A to pair and connect with headset.
headset.enter_pairing_mode()
- android.droid.bluetoothDiscoverAndBond(headset.mac_address)
+ android.droid.bluetoothDiscoverAndBond(headset_mac_address)
else: # Device is bonded but not connected
- android.droid.bluetoothConnectBonded(headset.mac_address)
+ android.droid.bluetoothConnectBonded(headset_mac_address)
log.info('Waiting for connection...')
time.sleep(connection_check_period)
# Check for connection.
- connected = is_a2dp_src_device_connected(android, headset.mac_address)
+ connected = is_a2dp_src_device_connected(android, headset_mac_address)
log.info('Devices connected after pair attempt: %s' % connected)
return connected
@@ -394,12 +400,13 @@ def determine_max_advertisements(android_device):
advertise_callback = android_device.droid.bleGenBleAdvertiseCallback()
advertise_callback_list.append(advertise_callback)
- android_device.droid.bleStartBleAdvertising(
- advertise_callback, advertise_data, advertise_settings)
+ android_device.droid.bleStartBleAdvertising(advertise_callback,
+ advertise_data,
+ advertise_settings)
regex = "(" + adv_succ.format(
advertise_callback) + "|" + adv_fail.format(
- advertise_callback) + ")"
+ advertise_callback) + ")"
# wait for either success or failure event
evt = android_device.ed.pop_events(regex, bt_default_timeout,
small_timeout)
@@ -585,10 +592,9 @@ def generate_ble_scan_objects(droid):
return filter_list, scan_settings, scan_callback
-def generate_id_by_size(
- size,
- chars=(
- string.ascii_lowercase + string.ascii_uppercase + string.digits)):
+def generate_id_by_size(size,
+ chars=(string.ascii_lowercase +
+ string.ascii_uppercase + string.digits)):
"""Generate random ascii characters of input size and input char types
Args:
@@ -650,6 +656,122 @@ def get_bluetooth_crash_count(android_device):
return int(re.search("crashed(.*\d)", out).group(1))
+def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True):
+ """ Function to get the bt metric from logcat.
+
+ Captures logcat for the specified duration and returns the bqr results.
+ Takes list of android objects as input. If a single android object is given,
+ converts it into a list.
+
+ Args:
+ ad_list: list of android_device objects
+ duration: time duration (seconds) for which the logcat is parsed.
+ tag: tag to be appended to the logcat dump.
+ processed: flag to process bqr output.
+
+ Returns:
+ metrics_dict: dict of metrics for each android device.
+ """
+
+ # Defining bqr quantitites and their regex to extract
+ regex_dict = {"pwlv": "PwLv:\s(\S+)", "rssi": "RSSI:\s[-](\d+)"}
+ metrics_dict = {"rssi": {}, "pwlv": {}}
+
+ # Converting a single android device object to list
+ if not isinstance(ad_list, list):
+ ad_list = [ad_list]
+
+ #Time sync with the test machine
+ for ad in ad_list:
+ ad.droid.setTime(int(round(time.time() * 1000)))
+ time.sleep(0.5)
+
+ begin_time = utils.get_current_epoch_time()
+ time.sleep(duration)
+ end_time = utils.get_current_epoch_time()
+
+ for ad in ad_list:
+ bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time)
+ bqr_tag = "Monitoring , Handle:"
+
+ # Extracting supporting bqr quantities
+ for metric, regex in regex_dict.items():
+ bqr_metric = []
+ file_bt_log = open(bt_rssi_log, "r")
+ for line in file_bt_log:
+ if bqr_tag in line:
+ if re.findall(regex, line):
+ m = re.findall(regex, line)[0].strip(",")
+ bqr_metric.append(m)
+ metrics_dict[metric][ad.serial] = bqr_metric
+
+ # Formatting the raw data
+ metrics_dict["rssi"][ad.serial] = [
+ (-1) * int(x) for x in metrics_dict["rssi"][ad.serial]
+ ]
+ metrics_dict["pwlv"][ad.serial] = [
+ int(x, 16) for x in metrics_dict["pwlv"][ad.serial]
+ ]
+
+ # Processing formatted data if processing is required
+ if processed:
+ # Computes the average RSSI
+ metrics_dict["rssi"][ad.serial] = round(
+ sum(metrics_dict["rssi"][ad.serial]) /
+ len(metrics_dict["rssi"][ad.serial]), 2)
+ # Returns last noted value for power level
+ metrics_dict["pwlv"][ad.serial] = metrics_dict["pwlv"][
+ ad.serial][-1]
+
+ return metrics_dict
+
+
+def get_bt_rssi(ad, duration=1, processed=True):
+ """Function to get average bt rssi from logcat.
+
+ This function returns the average RSSI for the given duration. RSSI values are
+ extracted from BQR.
+
+ Args:
+ ad: (list of) android_device object.
+ duration: time duration(seconds) for which logcat is parsed.
+
+ Returns:
+ avg_rssi: average RSSI on each android device for the given duration.
+ """
+ function_tag = "get_bt_rssi"
+ bqr_results = get_bt_metric(ad,
+ duration,
+ tag=function_tag,
+ processed=processed)
+ return bqr_results["rssi"]
+
+
+def enable_bqr(ad_list, bqr_interval=10, bqr_event_mask=15,):
+ """Sets up BQR reporting.
+
+ Sets up BQR to report BT metrics at the requested frequency and toggles
+ airplane mode for the bqr settings to take effect.
+
+ Args:
+ ad_list: an android_device or list of android devices.
+ """
+ # Converting a single android device object to list
+ if not isinstance(ad_list, list):
+ ad_list = [ad_list]
+
+ for ad in ad_list:
+ #Setting BQR parameters
+ ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format(
+ bqr_event_mask))
+ ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format(
+ bqr_interval))
+
+ ## Toggle airplane mode
+ ad.droid.connectivityToggleAirplaneMode(True)
+ ad.droid.connectivityToggleAirplaneMode(False)
+
+
def get_device_selector_dictionary(android_device_list):
"""Create a dictionary of Bluetooth features vs Android devices.
@@ -733,8 +855,8 @@ def get_mac_address_of_generic_advertisement(scan_ad, adv_ad):
adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
advertise_settings)
try:
- adv_ad.ed.pop_event(
- adv_succ.format(advertise_callback), bt_default_timeout)
+ adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
+ bt_default_timeout)
except Empty as err:
raise BtTestUtilsError(
"Advertiser did not start successfully {}".format(err))
@@ -973,8 +1095,8 @@ def orchestrate_bluetooth_socket_connection(
client_ad.droid.bluetoothStartPairingHelper()
server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid(
- (bluetooth_socket_conn_test_uuid
- if uuid is None else uuid), accept_timeout_ms)
+ (bluetooth_socket_conn_test_uuid if uuid is None else uuid),
+ accept_timeout_ms)
client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid(
server_ad.droid.bluetoothGetLocalAddress(),
(bluetooth_socket_conn_test_uuid if uuid is None else uuid))
@@ -1038,12 +1160,14 @@ def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True):
# Wait 2 seconds before unbound
time.sleep(2)
if not clear_bonded_devices(pri_ad):
- log.error("Failed to clear bond for primary device at attempt {}"
- .format(str(curr_attempts)))
+ log.error(
+ "Failed to clear bond for primary device at attempt {}".format(
+ str(curr_attempts)))
return False
if not clear_bonded_devices(sec_ad):
- log.error("Failed to clear bond for secondary device at attempt {}"
- .format(str(curr_attempts)))
+ log.error(
+ "Failed to clear bond for secondary device at attempt {}".
+ format(str(curr_attempts)))
return False
# Wait 2 seconds after unbound
time.sleep(2)
@@ -1141,8 +1265,8 @@ def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
while (start_time + bt_default_timeout) > time.time():
event = None
try:
- event = scn_ad.ed.pop_event(
- scan_result.format(scan_callback), bt_default_timeout)
+ event = scn_ad.ed.pop_event(scan_result.format(scan_callback),
+ bt_default_timeout)
except Empty as error:
raise BtTestUtilsError(
"Failed to find scan event: {}".format(error))
@@ -1156,13 +1280,12 @@ def scan_and_verify_n_advertisements(scn_ad, max_advertisements):
return test_result
-def set_bluetooth_codec(
- android_device,
- codec_type,
- sample_rate,
- bits_per_sample,
- channel_mode,
- codec_specific_1=0):
+def set_bluetooth_codec(android_device,
+ codec_type,
+ sample_rate,
+ bits_per_sample,
+ channel_mode,
+ codec_specific_1=0):
"""Sets the A2DP codec configuration on the AndroidDevice.
Args:
@@ -1181,31 +1304,26 @@ def set_bluetooth_codec(
bool: True if the codec config was successfully changed to the desired
values. Else False.
"""
- message = (
- "Set Android Device A2DP Bluetooth codec configuration:\n"
- "\tCodec: {codec_type}\n"
- "\tSample Rate: {sample_rate}\n"
- "\tBits per Sample: {bits_per_sample}\n"
- "\tChannel Mode: {channel_mode}".format(
- codec_type=codec_type,
- sample_rate=sample_rate,
- bits_per_sample=bits_per_sample,
- channel_mode=channel_mode
- )
- )
+ message = ("Set Android Device A2DP Bluetooth codec configuration:\n"
+ "\tCodec: {codec_type}\n"
+ "\tSample Rate: {sample_rate}\n"
+ "\tBits per Sample: {bits_per_sample}\n"
+ "\tChannel Mode: {channel_mode}".format(
+ codec_type=codec_type,
+ sample_rate=sample_rate,
+ bits_per_sample=bits_per_sample,
+ channel_mode=channel_mode))
android_device.log.info(message)
# Send SL4A command
droid, ed = android_device.droid, android_device.ed
if not droid.bluetoothA2dpSetCodecConfigPreference(
- codec_types[codec_type],
- sample_rates[str(sample_rate)],
- bits_per_samples[str(bits_per_sample)],
- channel_modes[channel_mode],
- codec_specific_1
- ):
- android_device.log.warning("SL4A command returned False. Codec was not "
- "changed.")
+ codec_types[codec_type], sample_rates[str(sample_rate)],
+ bits_per_samples[str(bits_per_sample)],
+ channel_modes[channel_mode], codec_specific_1):
+ android_device.log.warning(
+ "SL4A command returned False. Codec was not "
+ "changed.")
else:
try:
ed.pop_event(bluetooth_a2dp_codec_config_changed,
@@ -1223,14 +1341,10 @@ def set_bluetooth_codec(
android_device.log.warning("Could not verify codec config change "
"through ADB.")
elif split_out[1].strip().upper() != codec_type:
- android_device.log.error(
- "Codec config was not changed.\n"
- "\tExpected codec: {exp}\n"
- "\tActual codec: {act}".format(
- exp=codec_type,
- act=split_out[1].strip()
- )
- )
+ android_device.log.error("Codec config was not changed.\n"
+ "\tExpected codec: {exp}\n"
+ "\tActual codec: {act}".format(
+ exp=codec_type, act=split_out[1].strip()))
return False
android_device.log.info("Bluetooth codec successfully changed.")
return True
@@ -1336,8 +1450,8 @@ def setup_multiple_devices_for_bt_test(android_devices):
threads = []
try:
for a in android_devices:
- thread = threading.Thread(
- target=factory_reset_bluetooth, args=([[a]]))
+ thread = threading.Thread(target=factory_reset_bluetooth,
+ args=([[a]]))
threads.append(thread)
thread.start()
for t in threads:
@@ -1391,8 +1505,8 @@ def setup_n_advertisements(adv_ad, num_advertisements):
adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data,
advertise_settings)
try:
- adv_ad.ed.pop_event(
- adv_succ.format(advertise_callback), bt_default_timeout)
+ adv_ad.ed.pop_event(adv_succ.format(advertise_callback),
+ bt_default_timeout)
adv_ad.log.info("Advertisement {} started.".format(i + 1))
except Empty as error:
adv_ad.log.error("Advertisement {} failed to start.".format(i + 1))
@@ -1543,8 +1657,9 @@ def _wait_for_passkey_match(pri_ad, sec_ad):
timeout=bt_default_timeout)
sec_variant = sec_pairing_req["data"]["PairingVariant"]
sec_pin = sec_pairing_req["data"]["Pin"]
- sec_ad.log.info("Secondary device received Pin: {}, Variant: {}"
- .format(sec_pin, sec_variant))
+ sec_ad.log.info(
+ "Secondary device received Pin: {}, Variant: {}".format(
+ sec_pin, sec_variant))
except Empty as err:
log.error("Wait for pin error: {}".format(err))
log.error("Pairing request state, Primary: {}, Secondary: {}".format(
diff --git a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
index dba308c5e2..641c8e5ad8 100644
--- a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
+++ b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py
@@ -84,9 +84,6 @@ def reboot(ad):
if not int(ad.adb.shell("settings get global mobile_data")) == 1:
set_mobile_data(ad, True)
utils.sync_device_time(ad)
- if ad.model == "sailfish" or ad.model == "marlin":
- remount_device(ad)
- ad.adb.shell("echo at@test=8 >> /dev/at_mdm0")
def enable_gnss_verbose_logging(ad):
"""Enable GNSS VERBOSE Logging and persistent logcat.
@@ -505,22 +502,29 @@ def clear_aiding_data_by_gtw_gpstool(ad):
ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode clear")
time.sleep(10)
-def start_gnss_by_gtw_gpstool(ad, state, type="gnss"):
+
+def start_gnss_by_gtw_gpstool(ad, state, type="gnss", bgdisplay=False):
"""Start or stop GNSS on GTW_GPSTool.
Args:
ad: An AndroidDevice object.
state: True to start GNSS. False to Stop GNSS.
type: Different API for location fix. Use gnss/flp/nmea
+ bgdisplay: true to run GTW when Display off.
+ false to not run GTW when Display off.
"""
- if state:
+ if state and not bgdisplay:
ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool "
"--es mode gps --es type %s" % type)
+ elif state and bgdisplay:
+ ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode"
+ " gps --es type {} --ez BG {}".format(type, bgdisplay))
if not state:
ad.log.info("Stop %s on GTW_GPSTool." % type)
ad.adb.shell("am broadcast -a com.android.gpstool.stop_gps_action")
time.sleep(3)
+
def process_gnss_by_gtw_gpstool(ad, criteria, type="gnss"):
"""Launch GTW GPSTool and Clear all GNSS aiding data
Start GNSS tracking on GTW_GPSTool.
@@ -1025,6 +1029,7 @@ def get_baseband_and_gms_version(ad, extra_msg=""):
ad: An AndroidDevice object.
"""
try:
+ build_version = ad.adb.getprop("ro.build.id")
baseband_version = ad.adb.getprop("gsm.version.baseband")
gms_version = ad.adb.shell(
"dumpsys package com.google.android.gms | grep versionName"
@@ -1032,6 +1037,7 @@ def get_baseband_and_gms_version(ad, extra_msg=""):
mpss_version = ad.adb.shell("cat /sys/devices/soc0/images | grep MPSS "
"| cut -d ':' -f 3")
if not extra_msg:
+ ad.log.info("TestResult Build_Version %s" % build_version)
ad.log.info("TestResult Baseband_Version %s" % baseband_version)
ad.log.info(
"TestResult GMS_Version %s" % gms_version.replace(" ", ""))
diff --git a/acts/framework/acts/test_utils/power/PowerBTBaseTest.py b/acts/framework/acts/test_utils/power/PowerBTBaseTest.py
index 5dfda12c7a..e0fd37da2e 100644
--- a/acts/framework/acts/test_utils/power/PowerBTBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerBTBaseTest.py
@@ -14,18 +14,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import time
+import os
+import acts.test_utils.bt.bt_power_test_utils as btputils
+import acts.test_utils.bt.bt_test_utils as btutils
import acts.test_utils.power.PowerBaseTest as PBT
-from acts.test_utils.bt.bt_test_utils import enable_bluetooth
-from acts.test_utils.bt.bt_test_utils import disable_bluetooth
+from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory
-BT_BASE_UUID = '00000000-0000-1000-8000-00805F9B34FB'
-BT_CLASSICAL_DATA = [1, 2, 3]
-BLE_LOCATION_SCAN_ENABLE = 'settings put global ble_scan_always_enabled 1'
-BLE_LOCATION_SCAN_DISABLE = 'settings put global ble_scan_always_enabled 0'
-START_PMC_CMD = 'am start -n com.android.pmc/com.android.pmc.PMCMainActivity'
-PMC_VERBOSE_CMD = 'setprop log.tag.PMC VERBOSE'
-PMC_BASE_SCAN = 'am broadcast -a com.android.pmc.BLESCAN --es ScanMode '
+BLE_LOCATION_SCAN_DISABLE = 'settings put secure location_mode 0'
+PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music'
+INIT_ATTEN = [30]
class PowerBTBaseTest(PBT.PowerBaseTest):
@@ -34,16 +31,43 @@ class PowerBTBaseTest(PBT.PowerBaseTest):
Inherited from the PowerBaseTest class
"""
+ def setup_class(self):
+
+ super().setup_class()
+ # Get music file and push it to the phone
+ music_files = self.user_params.get('music_files', [])
+ if music_files:
+ music_src = music_files[0]
+ music_dest = PHONE_MUSIC_FILE_DIRECTORY
+ success = self.dut.push_system_file(music_src, music_dest)
+ if success:
+ self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY,
+ os.path.basename(music_src))
+ # Initialize media_control class
+ self.media = btputils.MediaControl(self.dut, self.music_file)
+ # Set Attenuator to the initial attenuation
+ if hasattr(self, 'attenuators'):
+ self.set_attenuation(INIT_ATTEN)
+ # Create the BTOE(Bluetooth-Other-End) device object
+ bt_devices = self.user_params.get('bt_devices', [])
+ if bt_devices:
+ attr, idx = bt_devices.split(':')
+ self.bt_device_controller = getattr(self, attr)[int(idx)]
+ self.bt_device = bt_factory().generate(self.bt_device_controller)
+ else:
+ self.log.error('No BT devices config is provided!')
+ # Turn off screen as all tests will be screen off
+ self.dut.droid.goToSleepNow()
+
def setup_test(self):
super().setup_test()
+ self.unpack_userparams(volume=0.9)
# Reset BT to factory defaults
self.dut.droid.bluetoothFactoryReset()
- time.sleep(2)
- # Start PMC app.
- self.log.info('Start PMC app...')
- self.dut.adb.shell(START_PMC_CMD)
- self.dut.adb.shell(PMC_VERBOSE_CMD)
+ self.bt_device.reset()
+ self.bt_device.power_on()
+ btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
def teardown_test(self):
"""Tear down necessary objects after test case is finished.
@@ -54,6 +78,11 @@ class PowerBTBaseTest(PBT.PowerBaseTest):
super().teardown_test()
self.dut.droid.bluetoothFactoryReset()
self.dut.adb.shell(BLE_LOCATION_SCAN_DISABLE)
+ if hasattr(self, 'media'):
+ self.media.stop()
+ self.bt_device.reset()
+ self.bt_device.power_off()
+ btutils.disable_bluetooth(self.dut.droid)
def teardown_class(self):
"""Clean up the test class after tests finish running
@@ -61,75 +90,7 @@ class PowerBTBaseTest(PBT.PowerBaseTest):
"""
super().teardown_class()
self.dut.droid.bluetoothFactoryReset()
-
- def phone_setup_for_BT(self, bt_on, ble_on, screen_status):
- """Sets the phone and Bluetooth in the desired state
-
- Args:
- bt_on: Enable/Disable BT
- ble_on: Enable/Disable BLE
- screen_status: screen ON or OFF
- """
-
- # Check if we are enabling a background scan
- # TODO: Turn OFF cellular wihtout having to turn ON airplane mode
- if bt_on == 'OFF' and ble_on == 'ON':
- self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE)
- self.dut.droid.connectivityToggleAirplaneMode(False)
- time.sleep(2)
-
- # Turn ON/OFF BT
- if bt_on == 'ON':
- enable_bluetooth(self.dut.droid, self.dut.ed)
- self.dut.log.info('BT is ON')
- else:
- disable_bluetooth(self.dut.droid)
- self.dut.droid.bluetoothDisableBLE()
- self.dut.log.info('BT is OFF')
- time.sleep(2)
-
- # Turn ON/OFF BLE
- if ble_on == 'ON':
- self.dut.droid.bluetoothEnableBLE()
- self.dut.log.info('BLE is ON')
- else:
- self.dut.droid.bluetoothDisableBLE()
- self.dut.log.info('BLE is OFF')
- time.sleep(2)
-
- # Set the desired screen status
- if screen_status == 'OFF':
- self.dut.droid.goToSleepNow()
- self.dut.log.info('Screen is OFF')
- time.sleep(2)
-
- def start_pmc_ble_scan(self,
- scan_mode,
- offset_start,
- scan_time,
- idle_time=None,
- num_reps=1):
- """Starts a generic BLE scan via the PMC app
-
- Args:
- dut: object of the android device under test
- scan mode: desired BLE scan type
- offset_start: Time delay in seconds before scan starts
- scan_time: active scan time
- idle_time: iddle time (i.e., no scans occuring)
- num_reps: Number of repetions of the ative+idle scan sequence
- """
- scan_dur = scan_time
- if not idle_time:
- idle_time = 0.2 * scan_time
- scan_dur = 0.8 * scan_time
-
- first_part_msg = '%s%s --es StartTime %d --es ScanTime %d' % (
- PMC_BASE_SCAN, scan_mode, offset_start, scan_dur)
-
- msg = '%s --es NoScanTime %d --es Repetitions %d' % (first_part_msg,
- idle_time,
- num_reps)
-
- self.dut.log.info('Sent BLE scan broadcast message: %s', msg)
- self.dut.adb.shell(msg)
+ self.dut.adb.shell(BLE_LOCATION_SCAN_DISABLE)
+ self.bt_device.reset()
+ self.bt_device.power_off()
+ btutils.disable_bluetooth(self.dut.droid)
diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py
index 71dbf7c2f3..2334702828 100644
--- a/acts/framework/acts/test_utils/power/PowerBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py
@@ -17,20 +17,23 @@ import json
import logging
import math
import os
+import re
import time
+
import acts.controllers.iperf_server as ipf
from acts import asserts
from acts import base_test
from acts import utils
-from acts.controllers import monsoon
+from acts.controllers.monsoon_lib.api.common import MonsoonError
+from acts.controllers.monsoon_lib.api.common import PassthroughStates
from acts.metrics.loggers.blackbox import BlackboxMetricLogger
from acts.test_utils.power.loggers.power_metric_logger import PowerMetricLogger
-from acts.test_utils.wifi import wifi_test_utils as wutils
from acts.test_utils.wifi import wifi_power_test_utils as wputils
+from acts.test_utils.wifi import wifi_test_utils as wutils
RESET_BATTERY_STATS = 'dumpsys batterystats --reset'
IPERF_TIMEOUT = 180
-THRESHOLD_TOLERANCE = 0.2
+THRESHOLD_TOLERANCE_DEFAULT = 0.2
GET_FROM_PHONE = 'get_from_dut'
GET_FROM_AP = 'get_from_ap'
PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2
@@ -49,6 +52,7 @@ class ObjNew():
"""Create a random obj with unknown attributes and value.
"""
+
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
@@ -67,6 +71,7 @@ class PowerBaseTest(base_test.BaseTestClass):
"""Base class for all wireless power related tests.
"""
+
def __init__(self, controllers):
base_test.BaseTestClass.__init__(self, controllers)
@@ -102,11 +107,13 @@ class PowerBaseTest(base_test.BaseTestClass):
bug_report=False,
extra_wait=None,
iperf_duration=None,
+ pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT,
mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT)
# Setup the must have controllers, phone and monsoon
self.dut = self.android_devices[0]
self.mon_data_path = os.path.join(self.log_path, 'Monsoon')
+ os.makedirs(self.mon_data_path, exist_ok=True)
self.mon = self.monsoons[0]
self.mon.set_max_current(8.0)
self.mon.set_voltage(self.mon_voltage)
@@ -122,7 +129,7 @@ class PowerBaseTest(base_test.BaseTestClass):
self.network_file = file
elif 'rockbottom_' + self.dut.model in file:
self.rockbottom_script = file
- #Abort the class if threshold and rockbottom file is missing
+ # Abort the class if threshold and rockbottom file is missing
asserts.abort_class_if(
not self.threshold_file,
'Required test pass/fail threshold file is missing')
@@ -137,7 +144,7 @@ class PowerBaseTest(base_test.BaseTestClass):
self.mon_info = self.create_monsoon_info()
# Sync device time, timezone and country code
- utils.require_sl4a((self.dut, ))
+ utils.require_sl4a((self.dut,))
utils.sync_device_time(self.dut)
self.dut.droid.wifiSetCountryCode('US')
@@ -212,6 +219,8 @@ class PowerBaseTest(base_test.BaseTestClass):
Args:
file: the common file containing pass fail threshold.
+ test_specific: if True, returns the JSON element within the file
+ that starts with the test class name.
"""
with open(file, 'r') as f:
params = json.load(f)
@@ -259,20 +268,23 @@ class PowerBaseTest(base_test.BaseTestClass):
"""The actual test flow and result processing and validate.
"""
- self.collect_power_data()
- self.pass_fail_check()
+ result = self.collect_power_data()
+ self.pass_fail_check(result.average_current)
def collect_power_data(self):
"""Measure power, plot and take log if needed.
+ Returns:
+ A MonsoonResult object.
"""
- tag = ''
# Collecting current measurement data and plot
- self.file_path, self.test_result = self.monsoon_data_collect_save()
- self.power_result.metric_value = self.test_result * self.mon_voltage
- wputils.monsoon_data_plot(self.mon_info, self.file_path, tag=tag)
+ result = self.monsoon_data_collect_save()
+ self.power_result.metric_value = (result.average_current *
+ self.mon_voltage)
+ wputils.monsoon_data_plot(self.mon_info, result)
+ return result
- def pass_fail_check(self):
+ def pass_fail_check(self, average_current=None):
"""Check the test result and decide if it passed or failed.
The threshold is provided in the config file. In this class, result is
@@ -285,18 +297,18 @@ class PowerBaseTest(base_test.BaseTestClass):
return
current_threshold = self.threshold[self.test_name]
- if self.test_result:
+ if average_current:
asserts.assert_true(
- abs(self.test_result - current_threshold) / current_threshold <
- THRESHOLD_TOLERANCE,
+ abs(average_current - current_threshold) / current_threshold <
+ self.pass_fail_tolerance,
'Measured average current in [{}]: {:.2f}mA, which is '
'out of the acceptable range {:.2f}±{:.2f}mA'.format(
- self.test_name, self.test_result, current_threshold,
+ self.test_name, average_current, current_threshold,
self.pass_fail_tolerance * current_threshold))
asserts.explicit_pass(
'Measurement finished for [{}]: {:.2f}mA, which is '
'within the acceptable range {:.2f}±{:.2f}'.format(
- self.test_name, self.test_result, current_threshold,
+ self.test_name, average_current, current_threshold,
self.pass_fail_tolerance * current_threshold))
else:
asserts.fail(
@@ -333,8 +345,12 @@ class PowerBaseTest(base_test.BaseTestClass):
logging.info('Monsoon recovered from unexpected error')
time.sleep(2)
return True
- except monsoon.MonsoonError:
- logging.info(self.mon.mon.ser.in_waiting)
+ except MonsoonError:
+ try:
+ self.log.info(self.mon_info.dut._mon.ser.in_waiting)
+ except AttributeError:
+ # This attribute does not exist for HVPMs.
+ pass
logging.warning('Unable to recover monsoon from unexpected error')
return False
@@ -345,94 +361,70 @@ class PowerBaseTest(base_test.BaseTestClass):
log file. Take bug report if requested.
Returns:
- data_path: the absolute path to the log file of monsoon current
- measurement
- avg_current: the average current of the test
+ A MonsoonResult object containing information about the gathered
+ data.
"""
tag = '{}_{}_{}'.format(self.test_name, self.dut.model,
self.dut.build_info['build_id'])
+
data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag))
- total_expected_samples = self.mon_info.freq * (self.mon_info.duration +
- self.mon_info.offset)
- min_required_samples = total_expected_samples * MIN_PERCENT_SAMPLE / 100
- # Retry counter for monsoon data aquisition
- retry_measure = 1
- # Indicator that need to re-collect data
- need_collect_data = 1
- result = None
- while retry_measure <= MEASUREMENT_RETRY_COUNT:
- try:
- # If need to retake data
- if need_collect_data == 1:
- #Resets the battery status right before the test started
- self.dut.adb.shell(RESET_BATTERY_STATS)
- self.log.info(
- 'Starting power measurement with monsoon box, try #{}'.
- format(retry_measure))
- #Start the power measurement using monsoon
- self.mon_info.dut.monsoon_usb_auto()
- result = self.mon_info.dut.measure_power(
- self.mon_info.freq,
- self.mon_info.duration,
- tag=tag,
- offset=self.mon_info.offset)
- self.mon_info.dut.reconnect_dut()
- # Reconnect to dut
- else:
- self.mon_info.dut.reconnect_dut()
- # Reconnect and return measurement results if no error happens
- avg_current = result.average_current
- monsoon.MonsoonData.save_to_text_file([result], data_path)
- self.log.info('Power measurement done within {} try'.format(
+
+ # If the specified Monsoon data file already exists (e.g., multiple
+ # measurements in a single test), write the results to a new file with
+ # the postfix "_#".
+ if os.path.exists(data_path):
+ highest_value = 1
+ for filename in os.listdir(os.path.dirname(data_path)):
+ match = re.match(r'{}_(\d+).txt'.format(tag), filename)
+ if match:
+ highest_value = int(match.group(1))
+
+ data_path = os.path.join(self.mon_info.data_path,
+ '%s_%s.txt' % (tag, highest_value + 1))
+
+ total_expected_samples = self.mon_info.freq * self.mon_info.duration
+ min_required_samples = (total_expected_samples
+ * MIN_PERCENT_SAMPLE / 100)
+ for retry_measure in range(1, MEASUREMENT_RETRY_COUNT + 1):
+ # Resets the battery status right before the test starts.
+ self.dut.adb.shell(RESET_BATTERY_STATS)
+ self.log.info(
+ 'Starting power measurement, attempt #{}.'.format(
retry_measure))
- return data_path, avg_current
- # Catch monsoon errors during measurement
- except monsoon.MonsoonError:
- self.log.info(self.mon_info.dut.mon.ser.in_waiting)
- # Break early if it's one count away from limit
- if retry_measure == MEASUREMENT_RETRY_COUNT:
- self.log.error(
- 'Test failed after maximum measurement retry')
- break
-
- self.log.warning('Monsoon error happened, now try to recover')
- # Retry loop to recover monsoon from error
- retry_monsoon = 1
- while retry_monsoon <= RECOVER_MONSOON_RETRY_COUNT:
- mon_status = self.monsoon_recover()
- if mon_status:
- break
- else:
- retry_monsoon += 1
- self.log.warning(
- 'Wait for {} second then try again'.format(
- MONSOON_RETRY_INTERVAL))
- time.sleep(MONSOON_RETRY_INTERVAL)
-
- # Break the loop to end test if failed to recover monsoon
- if not mon_status:
- self.log.error(
- 'Tried our best, still failed to recover monsoon')
- break
- else:
- # If there is no data, or captured samples are less than min
- # required, re-take
- if not result:
- self.log.warning('No data taken, need to remeasure')
- elif len(result._data_points) <= min_required_samples:
- self.log.warning(
- 'More than {} percent of samples are missing due to monsoon error. Need to remeasure'
- .format(100 - MIN_PERCENT_SAMPLE))
- else:
- need_collect_data = 0
- self.log.warning(
- 'Data collected is valid, try reconnect to DUT to finish test'
- )
- retry_measure += 1
-
- if retry_measure > MEASUREMENT_RETRY_COUNT:
- self.log.error('Test failed after maximum measurement retry')
+ # Start the power measurement using monsoon.
+ self.mon_info.dut.usb(PassthroughStates.AUTO)
+ result = self.mon_info.dut.measure_power(
+ self.mon_info.duration,
+ measure_after_seconds=self.mon_info.offset,
+ hz=self.mon_info.freq,
+ output_path=data_path)
+ self.mon_info.dut.usb(PassthroughStates.ON)
+
+ self.log.debug(result)
+ self.log.debug('Samples Gathered: %s. Max Samples: %s '
+ 'Min Samples Required: %s.' %
+ (result.num_samples, total_expected_samples,
+ min_required_samples))
+
+ if result.num_samples <= min_required_samples:
+ retry_measure += 1
+ self.log.warning(
+ 'More than {} percent of samples are missing due to '
+ 'dropped packets. Need to remeasure.'.format(
+ 100 - MIN_PERCENT_SAMPLE))
+ continue
+
+ self.log.info('Measurement successful after {} attempt(s).'.format(
+ retry_measure))
+ return result
+ else:
+ try:
+ self.log.info(self.mon_info.dut._mon.ser.in_waiting)
+ except AttributeError:
+ # This attribute does not exist for HVPMs.
+ pass
+ self.log.error('Unable to gather enough samples to run validation.')
def process_iperf_results(self):
"""Get the iperf results and process.
@@ -457,7 +449,7 @@ class PowerBaseTest(base_test.BaseTestClass):
throughput = (math.fsum(
iperf_result.instantaneous_rates[self.start_meas_time:-1]
) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1])
- ) * 8 * (1.024**2)
+ ) * 8 * (1.024 ** 2)
self.log.info('The average throughput is {}'.format(throughput))
except ValueError:
diff --git a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
index 14c95975c9..5f38aec086 100644
--- a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py
@@ -17,6 +17,8 @@
import logging
import time
+import os
+
import acts.test_utils.power.PowerBaseTest as PBT
from acts import base_test
@@ -38,40 +40,50 @@ class PowerGnssBaseTest(PBT.PowerBaseTest):
"""
def collect_power_data(self):
- """Measure power and plot.
-
- """
- tag = '_Power'
- super().collect_power_data()
- self.monsoon_data_plot_power(self.mon_info, self.file_path, tag=tag)
+ """Measure power and plot."""
+ result = super().collect_power_data()
+ self.monsoon_data_plot_power(self.mon_info, result, tag='_Power')
+ return result
- def monsoon_data_plot_power(self, mon_info, file_path, tag=""):
+ def monsoon_data_plot_power(self, mon_info, monsoon_results, tag=''):
"""Plot the monsoon power data using bokeh interactive plotting tool.
Args:
- mon_info: Dictionary with the monsoon packet config
- file_path: the path to the monsoon log file with current data
+ mon_info: Dictionary with the monsoon packet config.
+ monsoon_results: a MonsoonResult or list of MonsoonResult objects to
+ to plot.
+ tag: an extra tag to append to the resulting filename.
"""
- self.log.info("Plot the power measurement data")
- # Get results as monsoon data object from the input file
- results = monsoon.MonsoonData.from_text_file(file_path)
- # Decouple current and timestamp data from the monsoon object
- current_data = []
- timestamps = []
- voltage = results[0].voltage
- for result in results:
- current_data.extend(result.data_points)
- timestamps.extend(result.timestamps)
- period = 1 / mon_info.freq
- time_relative = [x * period for x in range(len(current_data))]
- # Calculate the average current for the test
-
- current_data = [x * 1000 for x in current_data]
- power_data = [x * voltage for x in current_data]
- avg_current = sum(current_data) / len(current_data)
- color = ['navy'] * len(current_data)
+ if not isinstance(monsoon_results, list):
+ monsoon_results = [monsoon_results]
+ logging.info('Plotting the power measurement data.')
+
+ voltage = monsoon_results[0].voltage
+
+ total_current = 0
+ total_samples = 0
+ for result in monsoon_results:
+ total_current += result.average_current * result.num_samples
+ total_samples += result.num_samples
+ avg_current = total_current / total_samples
+
+ time_relative = [
+ data_point.time
+ for monsoon_result in monsoon_results
+ for data_point in monsoon_result.get_data_points()
+ ]
+
+ power_data = [
+ data_point.current * voltage
+ for monsoon_result in monsoon_results
+ for data_point in monsoon_result.get_data_points()
+ ]
+
+ total_data_points = sum(
+ result.num_samples for result in monsoon_results)
+ color = ['navy'] * total_data_points
# Preparing the data and source link for bokehn java callback
source = ColumnDataSource(
@@ -94,18 +106,20 @@ class PowerGnssBaseTest(PBT.PowerBaseTest):
dt = DataTable(
source=s2, columns=columns, width=1300, height=60, editable=True)
- plot_title = file_path[file_path.rfind('/') + 1:-4] + tag
- output_file("%s/%s.html" % (mon_info.data_path, plot_title))
- TOOLS = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save'
+ plot_title = (os.path.basename(
+ os.path.splitext(monsoon_results[0].tag)[0])
+ + tag)
+ output_file(os.path.join(mon_info.data_path, plot_title + '.html'))
+ tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save'
# Create a new plot with the datatable above
plot = figure(
plot_width=1300,
plot_height=700,
title=plot_title,
- tools=TOOLS,
- output_backend="webgl")
- plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width"))
- plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height"))
+ tools=tools,
+ output_backend='webgl')
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='width'))
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='height'))
plot.line('x0', 'y0', source=source, line_width=2)
plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color')
plot.xaxis.axis_label = 'Time (s)'
@@ -119,8 +133,7 @@ class PowerGnssBaseTest(PBT.PowerBaseTest):
code=customjsscript)
# Layout the plot and the datatable bar
- l = layout([[dt], [plot]])
- save(l)
+ save(layout([[dt], [plot]]))
def disconnect_usb(self, ad, sleeptime):
"""Disconnect usb while device is on sleep and
diff --git a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
index 6af9a491b3..937656e352 100644
--- a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
+++ b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py
@@ -46,7 +46,7 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest):
self.pkt_sender = self.packet_senders[0]
if hasattr(self, 'iperf_servers'):
self.iperf_server = self.iperf_servers[0]
- if hasattr(self, 'iperf_duration'):
+ if self.iperf_duration:
self.mon_duration = self.iperf_duration - 10
self.create_monsoon_info()
@@ -115,10 +115,11 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest):
If IPERF is run, need to pull iperf results and attach it to the plot.
"""
- super().collect_power_data()
+ result = super().collect_power_data()
tag = ''
- if hasattr(self, IPERF_DURATION):
+ if self.iperf_duration:
throughput = self.process_iperf_results()
tag = '_RSSI_{0:d}dBm_Throughput_{1:.2f}Mbps'.format(
self.RSSI, throughput)
- wputils.monsoon_data_plot(self.mon_info, self.file_path, tag=tag)
+ wputils.monsoon_data_plot(self.mon_info, result, tag=tag)
+ return result
diff --git a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
index baa7d33179..8637b2dcee 100644
--- a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
+++ b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py
@@ -645,12 +645,10 @@ class LteSimulation(BaseSimulation):
self.log.info(
"The test name does not include the '{}' parameter. Disabled "
"by default.".format(self.PARAM_RRC_STATUS_CHANGE_TIMER))
- self.simulator.anritsu.set_lte_rrc_status_change(False)
+ self.simulator.set_lte_rrc_state_change_timer(False)
else:
- self.rrc_sc_timer = int(values[1])
- self.simulator.anritsu.set_lte_rrc_status_change(True)
- self.simulator.anritsu.set_lte_rrc_status_change_timer(
- self.rrc_sc_timer)
+ timer = int(values[1])
+ self.simulator.anritsu.set_lte_rrc_status_change(True, timer)
# Get uplink power
diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py
index fb4780095a..323e1bc1e0 100644
--- a/acts/framework/acts/test_utils/tel/tel_test_utils.py
+++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py
@@ -24,6 +24,7 @@ import re
import os
import urllib.parse
import time
+import acts.controllers.iperf_server as ipf
from acts import signals
from acts import utils
@@ -801,6 +802,15 @@ def get_service_state_by_adb(log, ad):
ad.log.info("mVoiceRegState is %s %s", result.group(1),
result.group(2))
return result.group(2)
+ else:
+ if getattr(ad, "sdm_log", False):
+ #look for all occurrence in string
+ result2 = re.findall(r"mVoiceRegState=(\S+)\((\S+)\)", output)
+ for voice_state in result2:
+ if voice_state[0] == 0:
+ ad.log.info("mVoiceRegState is 0 %s", voice_state[1])
+ return voice_state[1]
+ return result2[1][1]
else:
result = re.search(r"mServiceState=(\S+)", output)
if result:
@@ -2805,38 +2815,33 @@ def verify_internet_connection(log, ad, retries=3, expected_state=True):
return False
-def iperf_test_by_adb(log,
- ad,
- iperf_server,
- port_num=None,
- reverse=False,
- timeout=180,
- limit_rate=None,
- omit=10,
- ipv6=False,
- rate_dict=None,
- blocking=True,
- log_file_path=None):
- """Iperf test by adb.
+def iperf_test_with_options(log,
+ ad,
+ iperf_server,
+ iperf_option,
+ timeout=180,
+ rate_dict=None,
+ blocking=True,
+ log_file_path=None):
+ """Iperf adb run helper.
Args:
log: log object
ad: Android Device Object.
- iperf_Server: The iperf host url".
- port_num: TCP/UDP server port
+ iperf_server: The iperf host url".
+ iperf_option: The options to pass to iperf client
timeout: timeout for file download to complete.
- limit_rate: iperf bandwidth option. None by default
- omit: the omit option provided in iperf command.
+ rate_dict: dictionary that can be passed in to save data
+ blocking: run iperf in blocking mode if True
+ log_file_path: location to save logs
+ Returns:
+ True if IPerf runs without throwing an exception
"""
- iperf_option = "-t %s -O %s -J" % (timeout, omit)
- if limit_rate: iperf_option += " -b %s" % limit_rate
- if port_num: iperf_option += " -p %s" % port_num
- if ipv6: iperf_option += " -6"
- if reverse: iperf_option += " -R"
try:
if log_file_path:
ad.adb.shell("rm %s" % log_file_path, ignore_status=True)
ad.log.info("Running adb iperf test with server %s", iperf_server)
+ ad.log.info("IPerf options are %s", iperf_option)
if not blocking:
ad.run_iperf_client_nb(
iperf_server,
@@ -2846,21 +2851,133 @@ def iperf_test_by_adb(log,
return True
result, data = ad.run_iperf_client(
iperf_server, iperf_option, timeout=timeout + 60)
- ad.log.info("Iperf test result with server %s is %s", iperf_server,
+ ad.log.info("IPerf test result with server %s is %s", iperf_server,
result)
if result:
- data_json = json.loads(''.join(data))
- tx_rate = data_json['end']['sum_sent']['bits_per_second']
- rx_rate = data_json['end']['sum_received']['bits_per_second']
- ad.log.info(
- 'iPerf3 upload speed is %sbps, download speed is %sbps',
- tx_rate, rx_rate)
+ iperf_str = ''.join(data)
+ iperf_result = ipf.IPerfResult(iperf_str)
+ if "-u" in iperf_option:
+ udp_rate = iperf_result.avg_rate
+ if udp_rate is None:
+ ad.log.warning(
+ "UDP rate is none, IPerf server returned error: %s",
+ iperf_result.error)
+ ad.log.info("IPerf3 udp speed is %sbps", udp_rate)
+ else:
+ tx_rate = iperf_result.avg_send_rate
+ rx_rate = iperf_result.avg_receive_rate
+ if (tx_rate or rx_rate) is None:
+ ad.log.warning(
+ "A TCP rate is none, IPerf server returned error: %s",
+ iperf_result.error)
+ ad.log.info(
+ "IPerf3 upload speed is %sbps, download speed is %sbps",
+ tx_rate, rx_rate)
if rate_dict is not None:
rate_dict["Uplink"] = tx_rate
rate_dict["Downlink"] = rx_rate
return result
- except Exception as e:
+ except AdbError as e:
ad.log.warning("Fail to run iperf test with exception %s", e)
+ raise
+
+
+def iperf_udp_test_by_adb(log,
+ ad,
+ iperf_server,
+ port_num=None,
+ reverse=False,
+ timeout=180,
+ limit_rate=None,
+ omit=10,
+ ipv6=False,
+ rate_dict=None,
+ blocking=True,
+ log_file_path=None):
+ """Iperf test by adb using UDP.
+
+ Args:
+ log: log object
+ ad: Android Device Object.
+ iperf_Server: The iperf host url".
+ port_num: TCP/UDP server port
+ reverse: whether to test download instead of upload
+ timeout: timeout for file download to complete.
+ limit_rate: iperf bandwidth option. None by default
+ omit: the omit option provided in iperf command.
+ ipv6: whether to run the test as ipv6
+ rate_dict: dictionary that can be passed in to save data
+ blocking: run iperf in blocking mode if True
+ log_file_path: location to save logs
+ """
+ iperf_option = "-u -i 1 -t %s -O %s -J" % (timeout, omit)
+ if limit_rate:
+ iperf_option += " -b %s" % limit_rate
+ if port_num:
+ iperf_option += " -p %s" % port_num
+ if ipv6:
+ iperf_option += " -6"
+ if reverse:
+ iperf_option += " -R"
+ try:
+ return iperf_test_with_options(log,
+ ad,
+ iperf_server,
+ iperf_option,
+ timeout,
+ rate_dict,
+ blocking,
+ log_file_path)
+ except AdbError:
+ return False
+
+def iperf_test_by_adb(log,
+ ad,
+ iperf_server,
+ port_num=None,
+ reverse=False,
+ timeout=180,
+ limit_rate=None,
+ omit=10,
+ ipv6=False,
+ rate_dict=None,
+ blocking=True,
+ log_file_path=None):
+ """Iperf test by adb using TCP.
+
+ Args:
+ log: log object
+ ad: Android Device Object.
+ iperf_server: The iperf host url".
+ port_num: TCP/UDP server port
+ reverse: whether to test download instead of upload
+ timeout: timeout for file download to complete.
+ limit_rate: iperf bandwidth option. None by default
+ omit: the omit option provided in iperf command.
+ ipv6: whether to run the test as ipv6
+ rate_dict: dictionary that can be passed in to save data
+ blocking: run iperf in blocking mode if True
+ log_file_path: location to save logs
+ """
+ iperf_option = "-t %s -O %s -J" % (timeout, omit)
+ if limit_rate:
+ iperf_option += " -b %s" % limit_rate
+ if port_num:
+ iperf_option += " -p %s" % port_num
+ if ipv6:
+ iperf_option += " -6"
+ if reverse:
+ iperf_option += " -R"
+ try:
+ return iperf_test_with_options(log,
+ ad,
+ iperf_server,
+ iperf_option,
+ timeout,
+ rate_dict,
+ blocking,
+ log_file_path)
+ except AdbError:
return False
diff --git a/acts/framework/acts/test_utils/wifi/ota_chamber.py b/acts/framework/acts/test_utils/wifi/ota_chamber.py
index 26abbe5f8b..53494dad52 100644
--- a/acts/framework/acts/test_utils/wifi/ota_chamber.py
+++ b/acts/framework/acts/test_utils/wifi/ota_chamber.py
@@ -14,6 +14,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import contextlib
+import io
import time
from acts import logger
from acts import utils
@@ -48,7 +50,6 @@ class OtaChamber(object):
Base class provides functions whose implementation is shared by all
chambers.
"""
-
def reset_chamber(self):
"""Resets the chamber to its zero/home state."""
raise NotImplementedError
@@ -80,7 +81,6 @@ class OtaChamber(object):
class MockChamber(OtaChamber):
"""Class that implements mock chamber for test development and debug."""
-
def __init__(self, config):
self.config = config.copy()
self.device_id = self.config['device_id']
@@ -117,7 +117,6 @@ class MockChamber(OtaChamber):
class OctoboxChamber(OtaChamber):
"""Class that implements Octobox chamber."""
-
def __init__(self, config):
self.config = config.copy()
self.device_id = self.config['device_id']
@@ -130,8 +129,9 @@ class OctoboxChamber(OtaChamber):
def set_orientation(self, orientation):
self.log.info('Setting orientation to {} degrees.'.format(orientation))
- utils.exe_cmd('sudo {} -d {} -p {}'.format(
- self.TURNTABLE_FILE_PATH, self.device_id, orientation))
+ utils.exe_cmd('sudo {} -d {} -p {}'.format(self.TURNTABLE_FILE_PATH,
+ self.device_id,
+ orientation))
def reset_chamber(self):
self.log.info('Resetting chamber to home state')
@@ -155,7 +155,6 @@ class ChamberAutoConnect(object):
class BluetestChamber(OtaChamber):
"""Class that implements Octobox chamber."""
-
def __init__(self, config):
import flow
self.config = config.copy()
@@ -165,6 +164,16 @@ class BluetestChamber(OtaChamber):
self.stirrer_ids = [0, 1, 2]
self.current_mode = None
+ # Capture print output decorator
+ @staticmethod
+ def _capture_output(func, *args, **kwargs):
+ """Creates a decorator to capture stdout from bluetest module"""
+ f = io.StringIO()
+ with contextlib.redirect_stdout(f):
+ func(*args, **kwargs)
+ output = f.getvalue()
+ return output
+
def _connect(self):
self.chamber.connect(self.config['ip_address'],
self.config['username'], self.config['password'])
@@ -172,12 +181,15 @@ class BluetestChamber(OtaChamber):
def _init_manual_mode(self):
self.current_mode = 'manual'
for stirrer_id in self.stirrer_ids:
- self.chamber.chamber_stirring_manual_init(stirrer_id)
+ out = self._capture_output(
+ self.chamber.chamber_stirring_manual_init, stirrer_id)
+ if "failed" in out:
+ self.log.warning("Initialization error: {}".format(out))
time.sleep(CHAMBER_SLEEP)
def _init_continuous_mode(self):
self.current_mode = 'continuous'
- self.chamber.chamber_stirring_continous_init()
+ self.chamber.chamber_stirring_continuous_init()
def _init_stepped_mode(self, steps):
self.current_mode = 'stepped'
@@ -188,7 +200,17 @@ class BluetestChamber(OtaChamber):
if self.current_mode != 'manual':
self._init_manual_mode()
self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position))
- self.chamber.chamber_stirring_manual_set_pos(stirrer_id, position)
+ out = self._capture_output(
+ self.chamber.chamber_stirring_manual_set_pos, stirrer_id, position)
+ if "failed" in out:
+ self.log.warning("Bluetest error: {}".format(out))
+ self.log.warning("Set position failed. Retrying.")
+ self.current_mode = None
+ self.set_stirrer_pos(stirrer_id, position)
+ else:
+ self._capture_output(self.chamber.chamber_stirring_manual_wait,
+ CHAMBER_SLEEP)
+ self.log.warning('Stirrer {} at {}.'.format(stirrer_id, position))
def set_orientation(self, orientation):
self.set_stirrer_pos(2, orientation * 100 / 360)
@@ -196,10 +218,10 @@ class BluetestChamber(OtaChamber):
def start_continuous_stirrers(self):
if self.current_mode != 'continuous':
self._init_continuous_mode()
- self.chamber.chamber_stirring_continous_start()
+ self.chamber.chamber_stirring_continuous_start()
def stop_continuous_stirrers(self):
- self.chamber.chamber_stirring_continous_stop()
+ self.chamber.chamber_stirring_continuous_stop()
def step_stirrers(self, steps):
if self.current_mode != 'stepped':
diff --git a/acts/framework/acts/test_utils/wifi/ota_sniffer.py b/acts/framework/acts/test_utils/wifi/ota_sniffer.py
index 60168bb458..884f8f12ca 100644
--- a/acts/framework/acts/test_utils/wifi/ota_sniffer.py
+++ b/acts/framework/acts/test_utils/wifi/ota_sniffer.py
@@ -114,7 +114,6 @@ class OtaSnifferBase(object):
class MockSniffer(OtaSnifferBase):
"""Class that implements mock sniffer for test development and debug."""
-
def __init__(self, config):
self.log = logger.create_tagged_trace_logger("Mock Sniffer")
@@ -354,12 +353,12 @@ class TsharkSnifferBase(OtaSnifferBase):
log_file = self._get_full_file_path(tag)
with open(temp_dump_file, "r") as input_csv, open(log_file,
"w") as output_csv:
- reader = csv.DictReader(
- input_csv, fieldnames=self.TSHARK_COLUMNS, delimiter="^")
- writer = csv.DictWriter(
- output_csv,
- fieldnames=self.TSHARK_OUTPUT_COLUMNS,
- delimiter="\t")
+ reader = csv.DictReader(input_csv,
+ fieldnames=self.TSHARK_COLUMNS,
+ delimiter="^")
+ writer = csv.DictWriter(output_csv,
+ fieldnames=self.TSHARK_OUTPUT_COLUMNS,
+ delimiter="\t")
writer.writeheader()
for row in reader:
if row["subtype"] in self.TYPE_SUBTYPE_DICT.keys():
@@ -427,7 +426,6 @@ class TsharkSnifferBase(OtaSnifferBase):
class TsharkSnifferOnUnix(TsharkSnifferBase):
"""Class that implements Tshark based sniffer controller on Unix systems."""
-
def _scan_for_networks(self):
"""Scans the wireless networks on the sniffer.
@@ -457,7 +455,6 @@ class TsharkSnifferOnUnix(TsharkSnifferBase):
class TsharkSnifferOnLinux(TsharkSnifferBase):
"""Class that implements Tshark based sniffer controller on Linux systems."""
-
def _scan_for_networks(self):
"""Scans the wireless networks on the sniffer.
@@ -480,8 +477,8 @@ class TsharkSnifferOnLinux(TsharkSnifferBase):
password: password of the wireless network to connect to.
"""
if password != "":
- connect_command = "sudo nmcli device wifi connect {} password {} ".format(
- ssid, password),
+ connect_command = "sudo nmcli device wifi connect {} password {}".format(
+ ssid, password)
else:
connect_command = "sudo nmcli device wifi connect {}".format(ssid)
self._sniffer_server.run(connect_command)
diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
index 34a820b934..ac18a8f3e8 100644
--- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
+++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py
@@ -16,8 +16,8 @@
import logging
import time
+import os
from acts import utils
-from acts.controllers import monsoon
from acts.libs.proc import job
from acts.controllers.ap_lib import bridge_interface as bi
from acts.test_utils.wifi import wifi_test_utils as wutils
@@ -39,7 +39,7 @@ ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM='
MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM='
-def monsoon_data_plot(mon_info, file_path, tag=""):
+def monsoon_data_plot(mon_info, monsoon_results, tag=''):
"""Plot the monsoon current data using bokeh interactive plotting tool.
Plotting power measurement data with bokeh to generate interactive plots.
@@ -50,9 +50,10 @@ def monsoon_data_plot(mon_info, file_path, tag=""):
Args:
mon_info: obj with information of monsoon measurement, including
- monsoon device object, measurement frequency, duration and
- offset etc.
- file_path: the path to the monsoon log file with current data
+ monsoon device object, measurement frequency, duration, etc.
+ monsoon_results: a MonsoonResult or list of MonsoonResult objects to
+ to plot.
+ tag: an extra tag to append to the resulting filename.
Returns:
plot: the plotting object of bokeh, optional, will be needed if multiple
@@ -60,25 +61,35 @@ def monsoon_data_plot(mon_info, file_path, tag=""):
dt: the datatable object of bokeh, optional, will be needed if multiple
datatables will be combined to one html file.
"""
+ if not isinstance(monsoon_results, list):
+ monsoon_results = [monsoon_results]
+ logging.info('Plotting the power measurement data.')
+
+ voltage = monsoon_results[0].voltage
+
+ total_current = 0
+ total_samples = 0
+ for result in monsoon_results:
+ total_current += result.average_current * result.num_samples
+ total_samples += result.num_samples
+ avg_current = total_current / total_samples
+
+ time_relative = [
+ data_point.time
+ for monsoon_result in monsoon_results
+ for data_point in monsoon_result.get_data_points()
+ ]
- log = logging.getLogger()
- log.info("Plot the power measurement data")
- #Get results as monsoon data object from the input file
- results = monsoon.MonsoonData.from_text_file(file_path)
- #Decouple current and timestamp data from the monsoon object
- current_data = []
- timestamps = []
- voltage = results[0].voltage
- [current_data.extend(x.data_points) for x in results]
- [timestamps.extend(x.timestamps) for x in results]
- period = 1 / float(mon_info.freq)
- time_relative = [x * period for x in range(len(current_data))]
- #Calculate the average current for the test
- current_data = [x * 1000 for x in current_data]
- avg_current = sum(current_data) / len(current_data)
- color = ['navy'] * len(current_data)
-
- #Preparing the data and source link for bokehn java callback
+ current_data = [
+ data_point.current * 1000
+ for monsoon_result in monsoon_results
+ for data_point in monsoon_result.get_data_points()
+ ]
+
+ total_data_points = sum(result.num_samples for result in monsoon_results)
+ color = ['navy'] * total_data_points
+
+ # Preparing the data and source link for bokehn java callback
source = ColumnDataSource(
data=dict(x0=time_relative, y0=current_data, color=color))
s2 = ColumnDataSource(
@@ -88,7 +99,7 @@ def monsoon_data_plot(mon_info, file_path, tag=""):
x0=[round(avg_current * voltage, 2)],
z1=[round(avg_current * voltage * mon_info.duration, 2)],
z2=[round(avg_current * mon_info.duration, 2)]))
- #Setting up data table for the output
+ # Setting up data table for the output
columns = [
TableColumn(field='z0', title='Total Duration (s)'),
TableColumn(field='y0', title='Average Current (mA)'),
@@ -99,26 +110,29 @@ def monsoon_data_plot(mon_info, file_path, tag=""):
dt = DataTable(
source=s2, columns=columns, width=1300, height=60, editable=True)
- plot_title = file_path[file_path.rfind('/') + 1:-4] + tag
- output_file("%s/%s.html" % (mon_info.data_path, plot_title))
- TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save')
+ plot_title = (os.path.basename(os.path.splitext(monsoon_results[0].tag)[0])
+ + tag)
+ output_file(os.path.join(mon_info.data_path, plot_title + '.html'))
+ tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save'
# Create a new plot with the datatable above
plot = figure(
- plot_width=1300, plot_height=700, title=plot_title, tools=TOOLS)
- plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width"))
- plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height"))
+ plot_width=1300,
+ plot_height=700,
+ title=plot_title,
+ tools=tools,
+ output_backend='webgl')
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='width'))
+ plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='height'))
plot.line('x0', 'y0', source=source, line_width=2)
plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color')
plot.xaxis.axis_label = 'Time (s)'
plot.yaxis.axis_label = 'Current (mA)'
plot.title.text_font_size = {'value': '15pt'}
- #Callback Java scripting
+ # Callback JavaScript
source.selected.js_on_change(
"indices",
- CustomJS(
- args=dict(source=source, mytable=dt),
- code="""
+ CustomJS(args=dict(source=source, mytable=dt), code="""
var inds = cb_obj.indices;
var d1 = source.data;
var d2 = mytable.source.data;
@@ -154,10 +168,9 @@ def monsoon_data_plot(mon_info, file_path, tag=""):
mytable.change.emit();
"""))
- #Layout the plot and the datatable bar
- l = layout([[dt], [plot]])
- save(l)
- return [plot, dt]
+ # Layout the plot and the datatable bar
+ save(layout([[dt], [plot]]))
+ return plot, dt
def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=10):
@@ -287,12 +300,12 @@ def bokeh_plot(data_sets,
Returns:
plot: bokeh plot figure object
"""
- TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save')
+ tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save'
plot = figure(
plot_width=1300,
plot_height=700,
title=fig_property['title'],
- tools=TOOLS,
+ tools=tools,
output_backend="webgl")
plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width"))
plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height"))
@@ -324,7 +337,7 @@ def bokeh_plot(data_sets,
legend=str(legend),
fill_color=color)
- #Plot properties
+ # Plot properties
plot.xaxis.axis_label = fig_property['x_label']
plot.yaxis.axis_label = fig_property['y_label']
plot.legend.location = "top_right"
diff --git a/acts/framework/setup.py b/acts/framework/setup.py
index b5e4467bb6..32b8a93cad 100755
--- a/acts/framework/setup.py
+++ b/acts/framework/setup.py
@@ -39,6 +39,7 @@ install_requires = [
'xlsxwriter',
'mobly',
'grpcio',
+ 'Monsoon',
# paramiko-ng is needed vs paramiko as currently paramiko does not support
# ed25519 ssh keys, which is what Fuchsia uses.
'paramiko-ng',
diff --git a/acts/framework/tests/acts_import_unit_test.py b/acts/framework/tests/acts_import_unit_test.py
index 3dd4857fd0..38dc3bf103 100755
--- a/acts/framework/tests/acts_import_unit_test.py
+++ b/acts/framework/tests/acts_import_unit_test.py
@@ -48,6 +48,12 @@ else:
PY_FILE_REGEX = re.compile('.+\.py$')
BLACKLIST = [
+ # TODO(markdr): Remove these after BT team evaluates these tests.
+ 'acts/test_utils/bt/PowerBaseTest.py',
+ 'tests/google/ble/power/GattPowerTest.py',
+ 'tests/google/bt/power/A2dpPowerTest.py',
+ 'tests/google/ble/power/BleScanPowerTest.py',
+
'acts/controllers/rohdeschwarz_lib/contest.py',
'acts/controllers/native.py',
'acts/controllers/native_android_device.py',
diff --git a/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py b/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py
index ec274b956e..9d628959d9 100755
--- a/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py
+++ b/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py
@@ -209,15 +209,6 @@ class BaseMonsoonTest(unittest.TestCase):
'usbPassthroughMode should not be called when the '
'state does not change.')
- def test_monsoon_usb_auto_sets_usb_state_to_auto(self):
- monsoon = MonsoonImpl()
-
- monsoon.monsoon_usb_auto()
-
- self.assertEqual(monsoon.status.usbPassthroughMode,
- PassthroughStates.AUTO,
- 'monsoon_usb_auto() did not disconnect USB.')
-
def take_samples_always_reestablishes_the_monsoon_connection(self):
monsoon = MonsoonImpl()
assembly_line = mock.Mock()
diff --git a/acts/tests/google/power/PowerBaselineTest.py b/acts/tests/google/power/PowerBaselineTest.py
index b75514423d..d8ef277b2e 100644
--- a/acts/tests/google/power/PowerBaselineTest.py
+++ b/acts/tests/google/power/PowerBaselineTest.py
@@ -31,7 +31,7 @@ class PowerBaselineTest(PowerBaseTest):
self.dut.droid.goToSleepNow()
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
# Check if power measurement is below the required value
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
diff --git a/acts/tests/google/power/bt/PowerBLEadvertiseTest.py b/acts/tests/google/power/bt/PowerBLEadvertiseTest.py
new file mode 100644
index 0000000000..c12d2293e1
--- /dev/null
+++ b/acts/tests/google/power/bt/PowerBLEadvertiseTest.py
@@ -0,0 +1,63 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import acts.test_utils.bt.BleEnum as bleenum
+import acts.test_utils.bt.bt_power_test_utils as btputils
+import acts.test_utils.power.PowerBTBaseTest as PBtBT
+
+BLE_LOCATION_SCAN_ENABLE = 'settings put secure location_mode 3'
+EXTRA_ADV_TIME = 10
+
+
+class PowerBLEadvertiseTest(PBtBT.PowerBTBaseTest):
+ def __init__(self, configs):
+ super().__init__(configs)
+ req_params = ['adv_modes', 'adv_power_levels', 'adv_duration']
+ self.unpack_userparams(req_params)
+ # Loop all advertise modes and power levels
+ for adv_mode in self.adv_modes:
+ for adv_power_level in self.adv_power_levels:
+ self.generate_test_case(adv_mode, adv_power_level,
+ self.adv_duration)
+
+ def setup_class(self):
+
+ super().setup_class()
+ self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE)
+ # Make sure during power measurement, advertisement is always on
+ self.mon_info.duration = (
+ self.adv_duration - self.mon_offset - EXTRA_ADV_TIME)
+
+ def generate_test_case(self, adv_mode, adv_power_level, adv_duration):
+ def test_case_fn():
+
+ self.measure_ble_advertise_power(adv_mode, adv_power_level,
+ adv_duration)
+
+ adv_mode_str = bleenum.AdvertiseSettingsAdvertiseMode(adv_mode).name
+ adv_txpl_str = bleenum.AdvertiseSettingsAdvertiseTxPower(
+ adv_power_level).name.strip('ADVERTISE').strip('_')
+ test_case_name = ('test_BLE_{}_{}'.format(adv_mode_str, adv_txpl_str))
+ setattr(self, test_case_name, test_case_fn)
+
+ def measure_ble_advertise_power(self, adv_mode, adv_power_level,
+ adv_duration):
+
+ btputils.start_apk_ble_adv(self.dut, adv_mode, adv_power_level,
+ adv_duration)
+ time.sleep(EXTRA_ADV_TIME)
+ self.measure_power_and_validate()
diff --git a/acts/tests/google/power/bt/PowerBLEscanTest.py b/acts/tests/google/power/bt/PowerBLEscanTest.py
new file mode 100644
index 0000000000..8ed77b5a94
--- /dev/null
+++ b/acts/tests/google/power/bt/PowerBLEscanTest.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import acts.test_utils.bt.BleEnum as bleenum
+import acts.test_utils.bt.bt_power_test_utils as btputils
+import acts.test_utils.power.PowerBTBaseTest as PBtBT
+
+BLE_LOCATION_SCAN_ENABLE = 'settings put secure location_mode 3'
+EXTRA_SCAN_TIME = 10
+
+
+class PowerBLEscanTest(PBtBT.PowerBTBaseTest):
+ def __init__(self, configs):
+ super().__init__(configs)
+ req_params = ['scan_modes', 'scan_duration']
+ self.unpack_userparams(req_params)
+
+ for scan_mode in self.scan_modes:
+ self.generate_test_case_no_devices_around(scan_mode,
+ self.scan_duration)
+
+ def setup_class(self):
+
+ super().setup_class()
+ self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE)
+ # Make sure during power measurement, scan is always on
+ self.mon_info.duration = (
+ self.scan_duration - self.mon_offset - EXTRA_SCAN_TIME)
+
+ def generate_test_case_no_devices_around(self, scan_mode, scan_duration):
+ def test_case_fn():
+
+ self.measure_ble_scan_power(scan_mode, scan_duration)
+
+ test_case_name = ('test_BLE_{}_no_advertisers'.format(
+ bleenum.ScanSettingsScanMode(scan_mode).name))
+ setattr(self, test_case_name, test_case_fn)
+
+ def measure_ble_scan_power(self, scan_mode, scan_duration):
+
+ btputils.start_apk_ble_scan(self.dut, scan_mode, scan_duration)
+ time.sleep(EXTRA_SCAN_TIME)
+ self.measure_power_and_validate()
diff --git a/acts/tests/google/power/bt/PowerBTa2dpTest.py b/acts/tests/google/power/bt/PowerBTa2dpTest.py
new file mode 100644
index 0000000000..8122fc0748
--- /dev/null
+++ b/acts/tests/google/power/bt/PowerBTa2dpTest.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import acts.test_utils.bt.bt_test_utils as btutils
+import acts.test_utils.power.PowerBTBaseTest as PBtBT
+from acts import asserts
+from acts.test_utils.bt import BtEnum
+
+EXTRA_PLAY_TIME = 10
+
+
+class PowerBTa2dpTest(PBtBT.PowerBTBaseTest):
+ def __init__(self, configs):
+ super().__init__(configs)
+ req_params = ['codecs', 'tx_power_levels', 'atten_pl_settings']
+ self.unpack_userparams(req_params)
+ # Loop all codecs and tx power levels
+ for codec_config in self.codecs:
+ for tpl in self.tx_power_levels:
+ self.generate_test_case(codec_config, tpl)
+
+ def setup_test(self):
+ super().setup_test()
+ btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
+ vol = self.dut.droid.getMaxMediaVolume() * self.volume
+ self.dut.droid.setMediaVolume(0)
+ time.sleep(1)
+ self.dut.droid.setMediaVolume(int(vol))
+
+
+ def generate_test_case(self, codec_config, tpl):
+ def test_case_fn():
+ self.measure_a2dp_power(codec_config, tpl)
+
+ test_case_name = ('test_BTa2dp_{}_codec_at_PL{}'.format(
+ codec_config['codec_type'], tpl))
+ setattr(self, test_case_name, test_case_fn)
+
+ def measure_a2dp_power(self, codec_config, tpl):
+
+ current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig()
+ current_codec_type = BtEnum.BluetoothA2dpCodecType(
+ current_codec['codecType']).name
+ if current_codec_type != codec_config['codec_type']:
+ codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config)
+ asserts.assert_true(codec_set, 'Codec configuration failed.')
+ else:
+ self.log.info('Current Codec is {}, no need to change'.format(
+ current_codec_type))
+
+ # Set attenuation so BT tx at desired power level
+ tpl = 'PL' + str(tpl)
+ self.set_attenuation(self.atten_pl_settings[tpl])
+ self.log.info('Setting Attenuator to {} dB'.format(self.atten_pl_settings[tpl]))
+
+ self.media.play()
+ self.log.info('Running A2DP with codec {} at {}'.format(
+ codec_config['codec_type'], tpl))
+ self.dut.droid.goToSleepNow()
+ time.sleep(EXTRA_PLAY_TIME)
+ self.measure_power_and_validate() \ No newline at end of file
diff --git a/acts/tests/google/power/bt/PowerBTbaselineTest.py b/acts/tests/google/power/bt/PowerBTbaselineTest.py
deleted file mode 100644
index f4050c8de5..0000000000
--- a/acts/tests/google/power/bt/PowerBTbaselineTest.py
+++ /dev/null
@@ -1,62 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2018 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from acts.test_decorators import test_tracker_info
-import acts.test_utils.power.PowerBTBaseTest as PBtBT
-
-
-class PowerBTbaselineTest(PBtBT.PowerBTBaseTest):
- def bt_baseline_test_func(self):
- """Base function for BT baseline measurement.
-
- Steps:
- 1. Sets the phone in airplane mode, disables gestures and location
- 2. Turns ON/OFF BT, BLE and screen according to test conditions
- 3. Measures the power consumption
- 4. Asserts pass/fail criteria based on measured power
- """
-
- # Decode the test params from test name
- attrs = ['screen_status', 'bt_status', 'ble_status', 'scan_status']
- indices = [2, 4, 6, 7]
- self.decode_test_configs(attrs, indices)
- # Setup the phoen at desired state
- self.phone_setup_for_BT(self.test_configs.bt_status,
- self.test_configs.ble_status,
- self.test_configs.screen_status)
- if self.test_configs.scan_status == 'connectable':
- self.dut.droid.bluetoothMakeConnectable()
- elif self.test_configs.scan_status == 'discoverable':
- self.dut.droid.bluetoothMakeDiscoverable(
- self.mon_info.duration + self.mon_info.offset)
- self.measure_power_and_validate()
-
- # Test cases- Baseline
- @test_tracker_info(uuid='3f8ac0cb-f20d-4569-a58e-6009c89ea049')
- def test_screen_OFF_bt_ON_ble_ON_connectable(self):
- self.bt_baseline_test_func()
-
- @test_tracker_info(uuid='d54a992e-37ed-460a-ada7-2c51941557fd')
- def test_screen_OFF_bt_ON_ble_ON_discoverable(self):
- self.bt_baseline_test_func()
-
- @test_tracker_info(uuid='8f4c36b5-b18e-4aa5-9fe5-aafb729c1034')
- def test_screen_ON_bt_ON_ble_ON_connectable(self):
- self.bt_baseline_test_func()
-
- @test_tracker_info(uuid='7128356f-67d8-46b3-9d6b-1a4c9a7a1745')
- def test_screen_ON_bt_ON_ble_ON_discoverable(self):
- self.bt_baseline_test_func()
diff --git a/acts/tests/google/power/bt/PowerBTcalibrationTest.py b/acts/tests/google/power/bt/PowerBTcalibrationTest.py
new file mode 100644
index 0000000000..dffcc67231
--- /dev/null
+++ b/acts/tests/google/power/bt/PowerBTcalibrationTest.py
@@ -0,0 +1,65 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import csv
+import os
+import time
+import acts.test_utils.bt.bt_test_utils as btutils
+import acts.test_utils.power.PowerBTBaseTest as PBtBT
+from acts import utils
+
+EXTRA_PLAY_TIME = 10
+
+
+class PowerBTcalibrationTest(PBtBT.PowerBTBaseTest):
+ def setup_test(self):
+
+ super().setup_test()
+ self.attenuator = self.attenuators[0]
+ btutils.enable_bqr(self.dut)
+ btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
+ btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
+ vol = self.dut.droid.getMaxMediaVolume() * self.volume
+ self.dut.droid.setMediaVolume(int(vol))
+
+ self.cal_data_path = os.path.join(self.log_path, 'Calibration')
+ self.log_file = os.path.join(self.cal_data_path, 'Cal_data.csv')
+ utils.create_dir(os.path.dirname(self.log_file))
+
+
+ def test_calibrate(self):
+ """Run calibration to get attenuation value at each power level
+
+ """
+
+ self.cal_matrix = []
+ self.media.play()
+ time.sleep(EXTRA_PLAY_TIME)
+
+ # Loop through attenuation in 1 dB step until reaching at PL10
+ for i in range(int(self.attenuator.get_max_atten())):
+
+ self.attenuator.set_atten(i)
+ bt_metrics_dict = btutils.get_bt_metric(self.dut)
+ pwl = int(bt_metrics_dict['pwlv'][self.dut.serial])
+ self.cal_matrix.append([i, pwl])
+ if pwl == 10:
+ break
+
+ # Write cal results to csv
+ with open(self.log_file, 'w', newline='') as f:
+ writer = csv.writer(f)
+ writer.writerows(self.cal_matrix)
diff --git a/acts/tests/google/power/bt/PowerBTidleTest.py b/acts/tests/google/power/bt/PowerBTidleTest.py
new file mode 100644
index 0000000000..bab79d0836
--- /dev/null
+++ b/acts/tests/google/power/bt/PowerBTidleTest.py
@@ -0,0 +1,59 @@
+#!/usr/bin/env python3.4
+#
+# Copyright 2018 - The Android Open Source Project
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+import acts.test_utils.power.PowerBTBaseTest as PBtBT
+import acts.test_utils.bt.bt_test_utils as btutils
+
+SCREEN_OFF_WAIT_TIME = 2
+
+
+class PowerBTidleTest(PBtBT.PowerBTBaseTest):
+ def setup_class(self):
+
+ super().setup_class()
+ btutils.enable_bluetooth(self.dut.droid, self.dut.ed)
+
+ # Test cases- Baseline
+ def test_bt_on_unconnected_connectable(self):
+ """BT turned on connectable mode.
+
+ Page scan only.
+ """
+ self.dut.droid.bluetoothMakeConnectable()
+ self.dut.droid.goToSleepNow()
+ time.sleep(SCREEN_OFF_WAIT_TIME)
+ self.measure_power_and_validate()
+
+ def test_bt_on_unconnected_discoverable(self):
+ """BT turned on discoverable mode.
+
+ Page and inquiry scan.
+ """
+ self.dut.droid.bluetoothMakeConnectable()
+ self.dut.droid.bluetoothMakeDiscoverable()
+ self.dut.droid.goToSleepNow()
+ time.sleep(SCREEN_OFF_WAIT_TIME)
+ self.measure_power_and_validate()
+
+ def test_bt_connected_idle(self):
+ """BT idle after connecting to headset.
+
+ """
+ btutils.connect_phone_to_headset(self.dut, self.bt_device, 60)
+ self.dut.droid.goToSleepNow()
+ time.sleep(SCREEN_OFF_WAIT_TIME)
+ self.measure_power_and_validate()
diff --git a/acts/tests/google/power/bt/PowerBTscanTest.py b/acts/tests/google/power/bt/PowerBTscanTest.py
deleted file mode 100644
index 0ef622c648..0000000000
--- a/acts/tests/google/power/bt/PowerBTscanTest.py
+++ /dev/null
@@ -1,85 +0,0 @@
-#!/usr/bin/env python3.4
-#
-# Copyright 2018 - The Android Open Source Project
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import acts.test_utils.power.PowerBTBaseTest as PBtBT
-from acts.test_decorators import test_tracker_info
-
-
-class PowerBTscanTest(PBtBT.PowerBTBaseTest):
- def ble_scan_base_func(self):
- """Base function to start a generic BLE scan and measures the power
-
- Steps:
- 1. Sets the phone in airplane mode, disables gestures and location
- 2. Turns ON/OFF BT, BLE and screen according to test conditions
- 3. Sends the adb shell command to PMC to start scan
- 4. Measures the power consumption
- 5. Asserts pass/fail criteria based on measured power
- """
- # Decode the test params from test name
- attrs = ['screen_status', 'bt_status', 'ble_status', 'scan_mode']
- indices = [2, 4, 6, -1]
- self.decode_test_configs(attrs, indices)
- if self.test_configs.scan_mode == 'lowpower':
- scan_mode = 'low_power'
- elif self.test_configs.scan_mode == 'lowlatency':
- scan_mode = 'low_latency'
- else:
- scan_mode = self.test_configs.scan_mode
- self.phone_setup_for_BT(self.test_configs.bt_status,
- self.test_configs.ble_status,
- self.test_configs.screen_status)
- self.start_pmc_ble_scan(scan_mode, self.mon_info.offset,
- self.mon_info.duration)
- self.measure_power_and_validate()
-
- # Test Cases: BLE Scans + Filtered scans
- @test_tracker_info(uuid='e9a36161-1d0c-4b9a-8bd8-80fef8cdfe28')
- def test_screen_ON_bt_ON_ble_ON_default_scan_balanced(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='5fa61bf4-5f04-40bf-af52-6644b534d02e')
- def test_screen_OFF_bt_ON_ble_ON_filter_scan_opportunistic(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='512b6cde-be83-43b0-b799-761380ba69ff')
- def test_screen_OFF_bt_ON_ble_ON_filter_scan_lowpower(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='3a526838-ae7b-4cdb-bc29-89a5503d2306')
- def test_screen_OFF_bt_ON_ble_ON_filter_scan_balanced(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='03a57cfd-4269-4a09-8544-84f878d2e801')
- def test_screen_OFF_bt_ON_ble_ON_filter_scan_lowlatency(self):
- self.ble_scan_base_func()
-
- # Test Cases: Background scans
- @test_tracker_info(uuid='20145317-e362-4bfd-9860-4ceddf764784')
- def test_screen_ON_bt_OFF_ble_ON_background_scan_lowlatency(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='00a53dc3-2c33-43c4-b356-dba93249b823')
- def test_screen_ON_bt_OFF_ble_ON_background_scan_lowpower(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='b7185d64-631f-4b18-8d0b-4e14b80db375')
- def test_screen_OFF_bt_OFF_ble_ON_background_scan_lowlatency(self):
- self.ble_scan_base_func()
-
- @test_tracker_info(uuid='93eb05da-a577-409c-8208-6af1899a10c2')
- def test_screen_OFF_bt_OFF_ble_ON_background_scan_lowpower(self):
- self.ble_scan_base_func()
diff --git a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
index 09b47a62aa..9c2ce9a2ef 100644
--- a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
+++ b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py
@@ -16,10 +16,12 @@
import acts.test_utils.power.PowerGnssBaseTest as GBT
from acts.test_utils.gnss import dut_log_test_utils as diaglog
+from acts.test_utils.gnss import gnss_test_utils as gutil
import time
import os
from acts import utils
-MDLOG_RUNNING_TIME = 120
+MDLOG_RUNNING_TIME = 300
+DUT_ACTION_WAIT_TIME = 2
class PowerGnssDpoSimTest(GBT.PowerGnssBaseTest):
"""Power baseline tests for rockbottom state.
@@ -33,27 +35,35 @@ class PowerGnssDpoSimTest(GBT.PowerGnssBaseTest):
Decode the test config from the test name, set device to desired state.
Measure power and plot results.
"""
- self.collect_power_data()
- self.pass_fail_check()
+ result = self.collect_power_data()
+ self.pass_fail_check(result.average_current)
# Test cases
def test_gnss_dpoOFF_measurement(self):
utils.set_location_service(self.dut, True)
+ time.sleep(DUT_ACTION_WAIT_TIME)
+ gutil.start_gnss_by_gtw_gpstool(self.dut, state=True, type="gnss", bgdisplay=True)
self.dut.send_keycode("SLEEP")
+ time.sleep(DUT_ACTION_WAIT_TIME)
self.measure_gnsspower_test_func()
diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile)
self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME)
qxdm_log_path = os.path.join(self.log_path, 'QXDM')
diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path)
+ gutil.start_gnss_by_gtw_gpstool(self.dut, state=False)
def test_gnss_dpoON_measurement(self):
utils.set_location_service(self.dut, True)
+ time.sleep(DUT_ACTION_WAIT_TIME)
+ gutil.start_gnss_by_gtw_gpstool(self.dut, state=True, type="gnss", bgdisplay=True)
self.dut.send_keycode("SLEEP")
+ time.sleep(DUT_ACTION_WAIT_TIME)
self.measure_gnsspower_test_func()
diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile)
self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME)
qxdm_log_path = os.path.join(self.log_path, 'QXDM')
diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path)
+ gutil.start_gnss_by_gtw_gpstool(self.dut, state=False)
def test_gnss_rockbottom(self):
self.dut.send_keycode("SLEEP")
diff --git a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
index 66aa45815c..e5aabb76f9 100644
--- a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
+++ b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py
@@ -119,7 +119,7 @@ class PowerTelHotspotTest(PowerTelTrafficTest):
iperf_helpers = self.start_tel_traffic(self.android_devices[1])
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
# Wait for iPerf to finish
time.sleep(self.IPERF_MARGIN + 2)
@@ -129,7 +129,7 @@ class PowerTelHotspotTest(PowerTelTrafficTest):
iperf_helpers)
# Checks if power is below the required threshold.
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
def setup_test(self):
""" Executed before every test case.
diff --git a/acts/tests/google/power/tel/lab/PowerTelIdleTest.py b/acts/tests/google/power/tel/lab/PowerTelIdleTest.py
index 934fe20dc0..f1197988c6 100644
--- a/acts/tests/google/power/tel/lab/PowerTelIdleTest.py
+++ b/acts/tests/google/power/tel/lab/PowerTelIdleTest.py
@@ -33,7 +33,7 @@ class PowerTelIdleTest(PWCEL.PowerCellularLabBaseTest):
self.cellular_simulator.wait_until_idle_state(idle_wait_time)
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
# Check if power measurement is below the required value
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
diff --git a/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py b/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py
index 0abee5050a..b373fdb3d0 100644
--- a/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py
+++ b/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py
@@ -158,7 +158,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest):
iperf_helpers = self.start_tel_traffic(self.dut)
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
# Wait for iPerf to finish
time.sleep(self.IPERF_MARGIN + 2)
@@ -167,9 +167,9 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest):
self.iperf_results = self.get_iperf_results(self.dut, iperf_helpers)
# Check if power measurement is below the required value
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
- return self.test_result, self.iperf_results
+ return result.average_current, self.iperf_results
def get_iperf_results(self, device, iperf_helpers):
""" Pulls iperf results from the device.
@@ -196,7 +196,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest):
return throughput
- def pass_fail_check(self):
+ def pass_fail_check(self, average_current=None):
""" Checks power consumption and throughput.
Uses the base class method to check power consumption. Also, compares
@@ -230,7 +230,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest):
direction, round(throughput, 3), round(expected_t, 3),
round(throughput / expected_t, 3)))
- super().pass_fail_check()
+ super().pass_fail_check(average_current)
def start_tel_traffic(self, client_host):
""" Starts iPerf in the indicated device and initiates traffic.
diff --git a/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py b/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py
index f0d0eda6ce..93b69c6e2b 100644
--- a/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py
+++ b/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py
@@ -70,10 +70,10 @@ class PowerTelVoiceCallTest(PWCEL.PowerCellularLabBaseTest):
self.dut.droid.goToSleepNow()
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
# End the call
hangup_call(self.log, self.dut)
# Check if power measurement is within the required values
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
diff --git a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
index f554d12bde..57e926376a 100644
--- a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
+++ b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py
@@ -158,7 +158,7 @@ class PowerWiFiHotspotTest(PWBT.PowerWiFiBaseTest):
time.sleep(2)
# Measure power
- self.collect_power_data()
+ result = self.collect_power_data()
if traffic:
# Wait for iperf to finish
@@ -168,7 +168,7 @@ class PowerWiFiHotspotTest(PWBT.PowerWiFiBaseTest):
self.client_iperf_helper.process_iperf_results(
self.dut, self.log, self.iperf_servers, self.test_name)
- self.pass_fail_check()
+ self.pass_fail_check(result.average_current)
def power_idle_tethering_test(self):
""" Start power test when Hotspot is idle
diff --git a/acts/tests/google/power/wifi/PowerWiFimulticastTest.py b/acts/tests/google/power/wifi/PowerWiFimulticastTest.py
index 5a6108cdfb..8832469980 100644
--- a/acts/tests/google/power/wifi/PowerWiFimulticastTest.py
+++ b/acts/tests/google/power/wifi/PowerWiFimulticastTest.py
@@ -27,6 +27,22 @@ DNS_SHORT_LIFETIME = 3
class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
+ def setup_class(self):
+ super().setup_class()
+ self.unpack_userparams(sub_mask="255.255.255.0",
+ mac_dst="get_from_dut",
+ mac_src="get_local",
+ ipv4_dst="get_from_dut",
+ ipv4_src="get_local",
+ ipv6_dst="get_from_dut",
+ ipv6_src="get_local",
+ ipv6_src_type="LINK_LOCAL",
+ ipv4_gwt="192.168.1.1",
+ mac_dst_fake="40:90:28:EF:4B:20",
+ ipv4_dst_fake="192.168.1.60",
+ ipv6_dst_fake="fe80::300f:40ee:ee0a:5000",
+ interval=1)
+
def set_connection(self):
"""Setup connection between AP and client.
@@ -38,8 +54,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
indices = [2, 4]
self.decode_test_configs(attrs, indices)
# Change DTIMx1 on the phone to receive all Multicast packets
- rebooted = wputils.change_dtim(
- self.dut, gEnableModulatedDTIM=1, gMaxLIModulatedDTIM=10)
+ rebooted = wputils.change_dtim(self.dut,
+ gEnableModulatedDTIM=1,
+ gMaxLIModulatedDTIM=10)
self.dut.log.info('DTIM value of the phone is now DTIMx1')
if rebooted:
self.dut_rockbottom()
@@ -87,8 +104,8 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- ip_dst='0.0.0.0', eth_dst=self.pkt_gen_config['dst_mac'])
+ packet = pkt_gen.generate(ip_dst='0.0.0.0',
+ eth_dst=self.pkt_gen_config['dst_mac'])
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='5dcb16f1-725c-45de-8103-340104d60a22')
@@ -96,8 +113,8 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- ip_dst=self.ipv4_dst_fake, eth_dst=self.pkt_gen_config['dst_mac'])
+ packet = pkt_gen.generate(ip_dst=self.ipv4_dst_fake,
+ eth_dst=self.pkt_gen_config['dst_mac'])
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='5ec4800f-a82e-4462-8b65-4fcd0b1940a2')
@@ -105,12 +122,11 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- op='is-at',
- ip_src='0.0.0.0',
- ip_dst=self.ipv4_dst_fake,
- hwdst=self.mac_dst_fake,
- eth_dst=self.pkt_gen_config['dst_mac'])
+ packet = pkt_gen.generate(op='is-at',
+ ip_src='0.0.0.0',
+ ip_dst=self.ipv4_dst_fake,
+ hwdst=self.mac_dst_fake,
+ eth_dst=self.pkt_gen_config['dst_mac'])
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='6c5c0e9e-7a00-43d0-a6e8-355141467703')
@@ -118,11 +134,10 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- op='is-at',
- ip_dst=self.ipv4_dst_fake,
- hwdst=self.mac_dst_fake,
- eth_dst=self.pkt_gen_config['dst_mac'])
+ packet = pkt_gen.generate(op='is-at',
+ ip_dst=self.ipv4_dst_fake,
+ hwdst=self.mac_dst_fake,
+ eth_dst=self.pkt_gen_config['dst_mac'])
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='8e534d3b-5a25-429a-a1bb-8119d7d28b5a')
@@ -178,8 +193,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME,
+ enableDNS=True,
+ dns_lifetime=DNS_SHORT_LIFETIME)
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='84d2f1ff-bd4f-46c6-9b06-826d9b14909c')
@@ -187,8 +203,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME,
+ enableDNS=True,
+ dns_lifetime=DNS_LONG_LIFETIME)
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='4a17e74f-3e7f-4e90-ac9e-884a7c13cede')
@@ -333,8 +350,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME,
+ enableDNS=True,
+ dns_lifetime=DNS_SHORT_LIFETIME)
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='62b99cd7-75bf-45be-b93f-bb037a13b3e2')
@@ -342,8 +360,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest):
self.set_connection()
self.pkt_gen_config = wputils.create_pkt_config(self)
pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config)
- packet = pkt_gen.generate(
- RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME)
+ packet = pkt_gen.generate(RA_LONG_LIFETIME,
+ enableDNS=True,
+ dns_lifetime=DNS_LONG_LIFETIME)
self.sendPacketAndMeasure(packet)
@test_tracker_info(uuid='4088af4c-a64b-4fc1-848c-688936cc6c12')
diff --git a/acts/tests/google/power/wifi/PowerWiFiroamingTest.py b/acts/tests/google/power/wifi/PowerWiFiroamingTest.py
index 98409895f4..66110a1f34 100644
--- a/acts/tests/google/power/wifi/PowerWiFiroamingTest.py
+++ b/acts/tests/google/power/wifi/PowerWiFiroamingTest.py
@@ -81,22 +81,30 @@ class PowerWiFiroamingTest(PWBT.PowerWiFiBaseTest):
time.sleep(5)
# Toggle between two networks
begin_time = utils.get_current_epoch_time()
+ results = []
for i in range(self.toggle_times):
self.dut.log.info('Connecting to %s' % network_main[wc.SSID])
self.dut.droid.wifiConnect(network_main)
- file_path, avg_current = self.monsoon_data_collect_save()
+ results.append(self.monsoon_data_collect_save())
self.dut.log.info('Connecting to %s' % network_aux[wc.SSID])
self.dut.droid.wifiConnect(network_aux)
- file_path, avg_current = self.monsoon_data_collect_save()
- [plot, dt] = wputils.monsoon_data_plot(self.mon_info, file_path)
- self.test_result = dt.source.data['y0'][0]
- self.power_result.metric_value = (
- self.test_result * PHONE_BATTERY_VOLTAGE)
+ results.append(self.monsoon_data_collect_save())
+ wputils.monsoon_data_plot(self.mon_info, results)
+
+ total_current = 0
+ total_samples = 0
+ for result in results:
+ total_current += result.average_current * result.num_samples
+ total_samples += result.num_samples
+ average_current = total_current / total_samples
+
+ self.power_result.metric_value = [result.total_power for result in
+ results]
# Take Bugreport
if self.bug_report:
self.dut.take_bug_report(self.test_name, begin_time)
# Path fail check
- self.pass_fail_check()
+ self.pass_fail_check(average_current)
@test_tracker_info(uuid='e5ff95c0-b17e-425c-a903-821ba555a9b9')
def test_screenon_toggle_between_AP(self):
@@ -119,22 +127,30 @@ class PowerWiFiroamingTest(PWBT.PowerWiFiBaseTest):
time.sleep(5)
# Toggle between two networks
begin_time = utils.get_current_epoch_time()
+ results = []
for i in range(self.toggle_times):
self.dut.log.info('Connecting to %s' % network_main[wc.SSID])
self.dut.droid.wifiConnect(network_main)
- file_path, avg_current = self.monsoon_data_collect_save()
+ results.append(self.monsoon_data_collect_save())
self.dut.log.info('Connecting to %s' % network_aux[wc.SSID])
self.dut.droid.wifiConnect(network_aux)
- file_path, avg_current = self.monsoon_data_collect_save()
- [plot, dt] = wputils.monsoon_data_plot(self.mon_info, file_path)
- self.test_result = dt.source.data['y0'][0]
- self.power_result.metric_value = (
- self.test_result * PHONE_BATTERY_VOLTAGE)
+ results.append(self.monsoon_data_collect_save())
+ wputils.monsoon_data_plot(self.mon_info, results)
+
+ total_current = 0
+ total_samples = 0
+ for result in results:
+ total_current += result.average_current * result.num_samples
+ total_samples += result.num_samples
+ average_current = total_current / total_samples
+
+ self.power_result.metric_value = [result.total_power for result in
+ results]
# Take Bugreport
if self.bug_report:
self.dut.take_bug_report(self.test_name, begin_time)
# Path fail check
- self.pass_fail_check()
+ self.pass_fail_check(average_current)
@test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4')
def test_screenoff_wifi_wedge(self):
diff --git a/acts/tests/google/tel/live/TelLiveStressDataTest.py b/acts/tests/google/tel/live/TelLiveStressDataTest.py
new file mode 100644
index 0000000000..b8012aeca2
--- /dev/null
+++ b/acts/tests/google/tel/live/TelLiveStressDataTest.py
@@ -0,0 +1,77 @@
+#!/usr/bin/env python3
+#
+# Copyright 2019 - Google
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+ Test Script for Telephony Stress data Test
+"""
+from acts.test_decorators import test_tracker_info
+from acts.test_utils.tel.TelephonyBaseTest import TelephonyBaseTest
+from acts.test_utils.tel.tel_test_utils import iperf_test_by_adb
+from acts.test_utils.tel.tel_test_utils import iperf_udp_test_by_adb
+
+
+class TelLiveStressDataTest(TelephonyBaseTest):
+ def setup_class(self):
+ super().setup_class()
+ self.ad = self.android_devices[0]
+ self.iperf_server_address = self.user_params.get("iperf_server",
+ '0.0.0.0')
+ self.iperf_srv_tcp_port = self.user_params.get("iperf_server_tcp_port",
+ 0)
+ self.iperf_srv_udp_port = self.user_params.get("iperf_server_udp_port",
+ 0)
+ self.test_duration = self.user_params.get("data_stress_duration", 60)
+
+ return True
+
+ @test_tracker_info(uuid="190fdeb1-541e-455f-9f37-762a8e55c07f")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_tcp_upload_stress(self):
+ return iperf_test_by_adb(self.log,
+ self.ad,
+ self.iperf_server_address,
+ self.iperf_srv_tcp_port,
+ False,
+ self.test_duration)
+
+ @test_tracker_info(uuid="af9805f8-6ed5-4e05-823e-d88dcef45637")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_tcp_download_stress(self):
+ return iperf_test_by_adb(self.log,
+ self.ad,
+ self.iperf_server_address,
+ self.iperf_srv_tcp_port,
+ True,
+ self.test_duration)
+
+ @test_tracker_info(uuid="55bf5e09-dc7b-40bc-843f-31fed076ffe4")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_udp_upload_stress(self):
+ return iperf_udp_test_by_adb(self.log,
+ self.ad,
+ self.iperf_server_address,
+ self.iperf_srv_udp_port,
+ False,
+ self.test_duration)
+
+ @test_tracker_info(uuid="02ae88b2-d597-45df-ab5a-d701d1125a0f")
+ @TelephonyBaseTest.tel_test_wrap
+ def test_udp_download_stress(self):
+ return iperf_udp_test_by_adb(self.log,
+ self.ad,
+ self.iperf_server_address,
+ self.iperf_srv_udp_port,
+ True,
+ self.test_duration)
diff --git a/acts/tests/google/tel/live/TelLiveVoiceTest.py b/acts/tests/google/tel/live/TelLiveVoiceTest.py
index fc6aabad2d..5114daa41c 100644
--- a/acts/tests/google/tel/live/TelLiveVoiceTest.py
+++ b/acts/tests/google/tel/live/TelLiveVoiceTest.py
@@ -56,6 +56,7 @@ from acts.test_utils.tel.tel_test_utils import get_mobile_data_usage
from acts.test_utils.tel.tel_test_utils import hangup_call
from acts.test_utils.tel.tel_test_utils import initiate_call
from acts.test_utils.tel.tel_test_utils import is_phone_in_call_active
+from acts.test_utils.tel.tel_test_utils import is_phone_in_call
from acts.test_utils.tel.tel_test_utils import multithread_func
from acts.test_utils.tel.tel_test_utils import num_active_calls
from acts.test_utils.tel.tel_test_utils import remove_mobile_data_usage_limit
@@ -68,6 +69,8 @@ from acts.test_utils.tel.tel_test_utils import wait_for_ringing_call
from acts.test_utils.tel.tel_test_utils import wait_for_state
from acts.test_utils.tel.tel_test_utils import start_youtube_video
from acts.test_utils.tel.tel_test_utils import set_wifi_to_default
+from acts.test_utils.tel.tel_test_utils import STORY_LINE
+from acts.test_utils.tel.tel_test_utils import wait_for_in_call_active
from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_1x
from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_2g
from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_3g
@@ -113,6 +116,45 @@ class TelLiveVoiceTest(TelephonyBaseTest):
""" Tests Begin """
@TelephonyBaseTest.tel_test_wrap
+ @test_tracker_info(uuid="c5009f8c-eb1d-4cd9-85ce-604298bbeb3e")
+ def test_call_to_answering_machine(self):
+ """ Voice call to an answering machine.
+
+ 1. Make Sure PhoneA attached to voice network.
+ 2. Call from PhoneA to Storyline
+ 3. Verify call is in ACTIVE state
+ 4. Hangup Call from PhoneA
+
+ Raises:
+ TestFailure if not success.
+ """
+ ad = self.android_devices[0]
+
+ if not phone_setup_voice_general(ad.log, ad):
+ ad.log.error("Phone Failed to Set Up Properly for Voice.")
+ return False
+ for iteration in range(3):
+ result = True
+ ad.log.info("Attempt %d", iteration + 1)
+ if not initiate_call(ad.log, ad, STORY_LINE) and \
+ wait_for_in_call_active(ad, 60, 3):
+ ad.log.error("Call Failed to Initiate")
+ result = False
+ time.sleep(WAIT_TIME_IN_CALL)
+ if not is_phone_in_call(ad.log, ad):
+ ad.log.error("Call Dropped")
+ result = False
+ if not hangup_call(ad.log, ad):
+ ad.log.error("Call Failed to Hangup")
+ result = False
+ if result:
+ ad.log.info("Call test PASS in iteration %d", iteration + 1)
+ return True
+ ad.log.info("Call test FAIL in all 3 iterations")
+ return False
+
+
+ @TelephonyBaseTest.tel_test_wrap
@test_tracker_info(uuid="fca3f9e1-447a-416f-9a9c-50b7161981bf")
def test_call_mo_voice_general(self):
""" General voice to voice call.
diff --git a/acts/tests/google/wifi/WifiSensitivityTest.py b/acts/tests/google/wifi/WifiSensitivityTest.py
index 19476843cd..a982cc6972 100644
--- a/acts/tests/google/wifi/WifiSensitivityTest.py
+++ b/acts/tests/google/wifi/WifiSensitivityTest.py
@@ -194,7 +194,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
Args:
result: dict containing attenuation, throughput and other meta
- data
+ data
"""
try:
golden_path = next(file_name
@@ -207,7 +207,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest):
except:
golden_sensitivity = float('nan')
- result_string = ('Througput = {}%, Sensitivity = {}.'
+ result_string = ('Throughput = {}%, Sensitivity = {}.'
'Target Sensitivity = {}'.format(
result['peak_throughput_pct'],
result['sensitivity'], golden_sensitivity))