diff options
47 files changed, 1596 insertions, 1787 deletions
diff --git a/acts/framework/acts/bin/monsoon.py b/acts/framework/acts/bin/monsoon.py index c43eca505e..a76b425dd9 100755 --- a/acts/framework/acts/bin/monsoon.py +++ b/acts/framework/acts/bin/monsoon.py @@ -1,6 +1,6 @@ #!/usr/bin/env python3 # -# Copyright 2016 - The Android Open Source Project +# Copyright 2019 - The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,75 +13,100 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Interface for a USB-connected Monsoon power meter (http://msoon.com/LabEquipment/PowerMonitor/). """ import argparse -import sys -import time -import collections -from acts.controllers.monsoon import Monsoon +import acts.controllers.monsoon as monsoon_controller + -def main(FLAGS): +def main(args): """Simple command-line interface for Monsoon.""" - if FLAGS.avg and FLAGS.avg < 0: - print("--avg must be greater than 0") + if args.avg and args.avg < 0: + print('--avg must be greater than 0') return - mon = Monsoon(serial=int(FLAGS.serialno[0])) + mon = monsoon_controller.create([int(args.serialno[0])])[0] - if FLAGS.voltage is not None: - mon.set_voltage(FLAGS.voltage) + if args.voltage is not None: + mon.set_voltage(args.voltage) - if FLAGS.current is not None: - mon.set_max_current(FLAGS.current) + if args.current is not None: + mon.set_max_current(args.current) - if FLAGS.status: + if args.status: items = sorted(mon.status.items()) - print("\n".join(["%s: %s" % item for item in items])) + print('\n'.join(['%s: %s' % item for item in items])) - if FLAGS.usbpassthrough: - mon.usb(FLAGS.usbpassthrough) + if args.usbpassthrough: + mon.usb(args.usbpassthrough) - if FLAGS.startcurrent is not None: - mon.set_max_init_current(FLAGS.startcurrent) + if args.startcurrent is not None: + mon.set_max_initial_current(args.startcurrent) - if FLAGS.samples: - # Have to sleep a bit here for monsoon to be ready to lower the rate of - # socket read timeout. - time.sleep(1) - result = mon.take_samples(FLAGS.hz, FLAGS.samples, - sample_offset=FLAGS.offset, live=True) + if args.samples: + result = mon.measure_power( + args.samples / args.hz, + measure_after_seconds=args.offset, + hz=args.hz, + output_path='monsoon_output.txt') print(repr(result)) + if __name__ == '__main__': - parser = argparse.ArgumentParser(description=("This is a python utility " - "tool to control monsoon power measurement boxes.")) - parser.add_argument("--status", action="store_true", - help="Print power meter status.") - parser.add_argument("-avg", "--avg", type=int, default=0, - help="Also report average over last n data points.") - parser.add_argument("-v", "--voltage", type=float, - help="Set output voltage (0 for off)") - parser.add_argument("-c", "--current", type=float, - help="Set max output current.") - parser.add_argument("-sc", "--startcurrent", type=float, - help="Set max power-up/inital current.") - parser.add_argument("-usb", "--usbpassthrough", choices=("on", "off", - "auto"), help="USB control (on, off, auto).") - parser.add_argument("-sp", "--samples", type=int, - help="Collect and print this many samples") - parser.add_argument("-hz", "--hz", type=int, - help="Sample this many times per second.") - parser.add_argument("-d", "--device", help="Use this /dev/ttyACM... file.") - parser.add_argument("-sn", "--serialno", type=int, nargs=1, required=True, - help="The serial number of the Monsoon to use.") - parser.add_argument("--offset", type=int, nargs='?', default=0, - help="The number of samples to discard when calculating average.") - parser.add_argument("-r", "--ramp", action="store_true", help=("Gradually " - "increase voltage to prevent tripping Monsoon overvoltage")) - args = parser.parse_args() - main(args) + parser = argparse.ArgumentParser( + description='This is a python utility tool to control monsoon power ' + 'measurement boxes.') + parser.add_argument( + '--status', action='store_true', help='Print power meter status.') + parser.add_argument( + '-avg', + '--avg', + type=int, + default=0, + help='Also report average over last n data points.') + parser.add_argument( + '-v', '--voltage', type=float, help='Set output voltage (0 for off)') + parser.add_argument( + '-c', '--current', type=float, help='Set max output current.') + parser.add_argument( + '-sc', + '--startcurrent', + type=float, + help='Set max power-up/initial current.') + parser.add_argument( + '-usb', + '--usbpassthrough', + choices=('on', 'off', 'auto'), + help='USB control (on, off, auto).') + parser.add_argument( + '-sp', + '--samples', + type=int, + help='Collect and print this many samples') + parser.add_argument( + '-hz', '--hz', type=int, help='Sample this many times per second.') + parser.add_argument('-d', '--device', help='Use this /dev/ttyACM... file.') + parser.add_argument( + '-sn', + '--serialno', + type=int, + nargs=1, + required=True, + help='The serial number of the Monsoon to use.') + parser.add_argument( + '--offset', + type=int, + nargs='?', + default=0, + help='The number of samples to discard when calculating average.') + parser.add_argument( + '-r', + '--ramp', + action='store_true', + help='Gradually increase voltage to prevent tripping Monsoon ' + 'overvoltage.') + arguments = parser.parse_args() + main(arguments) diff --git a/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py b/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py index 23fee412cb..08db20b52f 100644 --- a/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py +++ b/acts/framework/acts/controllers/anritsu_lib/md8475_cellular_simulator.py @@ -177,6 +177,17 @@ class MD8475CellularSimulator(cc.AbstractCellularSimulator): else: self.bts[bts_index].tbs_pattern = 'OFF' + def set_lte_rrc_state_change_timer(self, enabled, time=10): + """ Configures the LTE RRC state change timer. + + Args: + enabled: a boolean indicating if the timer should be on or off. + time: time in seconds for the timer to expire + """ + self.anritsu.set_lte_rrc_status_change(enabled) + if enabled: + self.anritsu.set_lte_rrc_status_change_timer(time) + def set_band(self, bts_index, band): """ Sets the right duplex mode before switching to a new band. diff --git a/acts/framework/acts/controllers/cellular_simulator.py b/acts/framework/acts/controllers/cellular_simulator.py index a140c0f202..35f0885c46 100644 --- a/acts/framework/acts/controllers/cellular_simulator.py +++ b/acts/framework/acts/controllers/cellular_simulator.py @@ -133,6 +133,15 @@ class AbstractCellularSimulator: if config.tbs_pattern_on is not None: self.set_tbs_pattern_on(bts_index, config.tbs_pattern_on) + def set_lte_rrc_state_change_timer(self, enabled, time=10): + """ Configures the LTE RRC state change timer. + + Args: + enabled: a boolean indicating if the timer should be on or off. + time: time in seconds for the timer to expire + """ + raise NotImplementedError() + def set_band(self, bts_index, band): """ Sets the band for the indicated base station. diff --git a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py index 5df5b40127..f0b8dda4fb 100644 --- a/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py +++ b/acts/framework/acts/controllers/fuchsia_lib/utils_lib.py @@ -30,21 +30,24 @@ def get_private_key(ip_address, ssh_config): Returns: The ssh private key """ + exceptions = [] try: logging.debug('Trying to load SSH key type: ed25519') return paramiko.ed25519key.Ed25519Key( filename=get_ssh_key_for_host(ip_address, ssh_config)) - except paramiko.SSHException: + except paramiko.SSHException as e: + exceptions.append(e) logging.debug('Failed loading SSH key type: ed25519') try: logging.debug('Trying to load SSH key type: rsa') return paramiko.RSAKey.from_private_key_file( filename=get_ssh_key_for_host(ip_address, ssh_config)) - except paramiko.SSHException: + except paramiko.SSHException as e: + exceptions.append(e) logging.debug('Failed loading SSH key type: rsa') - raise paramiko.SSHException('No valid ssh key type found') + raise Exception('No valid ssh key type found', exceptions) def create_ssh_connection(ip_address, diff --git a/acts/framework/acts/controllers/iperf_server.py b/acts/framework/acts/controllers/iperf_server.py index bedc05a748..039f143470 100755 --- a/acts/framework/acts/controllers/iperf_server.py +++ b/acts/framework/acts/controllers/iperf_server.py @@ -81,20 +81,25 @@ class IPerfResult(object): will be loaded and this funtion is not intended to be used with files containing multiple iperf client runs. """ - try: - with open(result_path, 'r') as f: - iperf_output = f.readlines() - if '}\n' in iperf_output: - iperf_output = iperf_output[:iperf_output.index('}\n') + 1] - iperf_string = ''.join(iperf_output) - iperf_string = iperf_string.replace('nan', '0') - self.result = json.loads(iperf_string) - except ValueError: - with open(result_path, 'r') as f: - # Possibly a result from interrupted iperf run, skip first line - # and try again. - lines = f.readlines()[1:] - self.result = json.loads(''.join(lines)) + # if result_path isn't a path, treat it as JSON + if not os.path.exists(result_path): + self.result = json.loads(result_path) + else: + try: + with open(result_path, 'r') as f: + iperf_output = f.readlines() + if '}\n' in iperf_output: + iperf_output = iperf_output[:iperf_output.index('}\n') + + 1] + iperf_string = ''.join(iperf_output) + iperf_string = iperf_string.replace('nan', '0') + self.result = json.loads(iperf_string) + except ValueError: + with open(result_path, 'r') as f: + # Possibly a result from interrupted iperf run, + # skip first line and try again. + lines = f.readlines()[1:] + self.result = json.loads(''.join(lines)) def _has_data(self): """Checks if the iperf result has valid throughput data. @@ -197,7 +202,7 @@ class IPerfResult(object): instantaneous_rates = self.instantaneous_rates[iperf_ignored_interval: -1] avg_rate = math.fsum(instantaneous_rates) / len(instantaneous_rates) - sqd_deviations = [(rate - avg_rate)**2 for rate in instantaneous_rates] + sqd_deviations = [(rate - avg_rate) ** 2 for rate in instantaneous_rates] std_dev = math.sqrt( math.fsum(sqd_deviations) / (len(sqd_deviations) - 1)) return std_dev diff --git a/acts/framework/acts/controllers/monsoon.py b/acts/framework/acts/controllers/monsoon.py index d5b24a2703..9488837dbb 100644 --- a/acts/framework/acts/controllers/monsoon.py +++ b/acts/framework/acts/controllers/monsoon.py @@ -13,1006 +13,28 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -"""Interface for a USB-connected Monsoon power meter -(http://msoon.com/LabEquipment/PowerMonitor/). -Based on the original py2 script of kens@google.com -""" -import fcntl -import logging -import os -import select -import struct -import sys -import time -import collections +from acts.controllers.monsoon_lib.api.hvpm.monsoon import Monsoon as HvpmMonsoon +from acts.controllers.monsoon_lib.api.lvpm_stock.monsoon import Monsoon as LvpmStockMonsoon -# http://pyserial.sourceforge.net/ -# On ubuntu, apt-get install python3-pyserial -import serial - -import acts.signals - -from acts import utils -from acts.controllers import android_device - -ACTS_CONTROLLER_CONFIG_NAME = "Monsoon" -ACTS_CONTROLLER_REFERENCE_NAME = "monsoons" +ACTS_CONTROLLER_CONFIG_NAME = 'Monsoon' +ACTS_CONTROLLER_REFERENCE_NAME = 'monsoons' def create(configs): objs = [] - for c in configs: - objs.append(Monsoon(serial=int(c))) - return objs - - -def destroy(objs): - for obj in objs: - fcntl.flock(obj.mon._tempfile, fcntl.LOCK_UN) - obj.mon._tempfile.close() - - -class MonsoonError(acts.signals.ControllerError): - """Raised for exceptions encountered in monsoon lib.""" - - -class MonsoonProxy(object): - """Class that directly talks to monsoon over serial. - - Provides a simple class to use the power meter, e.g. - mon = monsoon.Monsoon() - mon.SetVoltage(3.7) - mon.StartDataCollection() - mydata = [] - while len(mydata) < 1000: - mydata.extend(mon.CollectData()) - mon.StopDataCollection() - - See http://wiki/Main/MonsoonProtocol for information on the protocol. - """ - - def __init__(self, device=None, serialno=None, wait=1): - """Establish a connection to a Monsoon. - - By default, opens the first available port, waiting if none are ready. - A particular port can be specified with "device", or a particular - Monsoon can be specified with "serialno" (using the number printed on - its back). With wait=0, IOError is thrown if a device is not - immediately available. - """ - self._coarse_ref = self._fine_ref = self._coarse_zero = 0 - self._fine_zero = self._coarse_scale = self._fine_scale = 0 - self._last_seq = 0 - self.start_voltage = 0 - self.serial = serialno - - if device: - self.ser = serial.Serial(device, timeout=1) - return - # Try all devices connected through USB virtual serial ports until we - # find one we can use. - while True: - for dev in os.listdir("/dev"): - prefix = "ttyACM" - # Prefix is different on Mac OS X. - if sys.platform == "darwin": - prefix = "tty.usbmodem" - if not dev.startswith(prefix): - continue - tmpname = "/tmp/monsoon.%s.%s" % (os.uname()[0], dev) - self._tempfile = open(tmpname, "w") - try: - os.chmod(tmpname, 0o666) - except OSError as e: - pass - - try: # use a lockfile to ensure exclusive access - fcntl.flock(self._tempfile, fcntl.LOCK_EX | fcntl.LOCK_NB) - except IOError as e: - logging.error("device %s is in use", dev) - continue - - try: # try to open the device - self.ser = serial.Serial("/dev/%s" % dev, timeout=1) - self.StopDataCollection() # just in case - self._FlushInput() # discard stale input - status = self.GetStatus() - except Exception as e: - logging.exception("Error opening device %s: %s", dev, e) - continue - - if not status: - logging.error("no response from device %s", dev) - elif serialno and status["serialNumber"] != serialno: - logging.error("Another device serial #%d seen on %s", - status["serialNumber"], dev) - else: - self.start_voltage = status["voltage1"] - return - - self._tempfile = None - if not wait: raise IOError("No device found") - logging.info("Waiting for device...") - time.sleep(1) - - def GetStatus(self): - """Requests and waits for status. - - Returns: - status dictionary. - """ - # status packet format - STATUS_FORMAT = ">BBBhhhHhhhHBBBxBbHBHHHHBbbHHBBBbbbbbbbbbBH" - STATUS_FIELDS = [ - "packetType", - "firmwareVersion", - "protocolVersion", - "mainFineCurrent", - "usbFineCurrent", - "auxFineCurrent", - "voltage1", - "mainCoarseCurrent", - "usbCoarseCurrent", - "auxCoarseCurrent", - "voltage2", - "outputVoltageSetting", - "temperature", - "status", - "leds", - "mainFineResistor", - "serialNumber", - "sampleRate", - "dacCalLow", - "dacCalHigh", - "powerUpCurrentLimit", - "runTimeCurrentLimit", - "powerUpTime", - "usbFineResistor", - "auxFineResistor", - "initialUsbVoltage", - "initialAuxVoltage", - "hardwareRevision", - "temperatureLimit", - "usbPassthroughMode", - "mainCoarseResistor", - "usbCoarseResistor", - "auxCoarseResistor", - "defMainFineResistor", - "defUsbFineResistor", - "defAuxFineResistor", - "defMainCoarseResistor", - "defUsbCoarseResistor", - "defAuxCoarseResistor", - "eventCode", - "eventData", - ] - - self._SendStruct("BBB", 0x01, 0x00, 0x00) - while 1: # Keep reading, discarding non-status packets - read_bytes = self._ReadPacket() - if not read_bytes: - raise MonsoonError("Failed to read Monsoon status") - calsize = struct.calcsize(STATUS_FORMAT) - if len(read_bytes) != calsize or read_bytes[0] != 0x10: - raise MonsoonError( - "Wanted status, dropped type=0x%02x, len=%d", - read_bytes[0], len(read_bytes)) - status = dict( - zip(STATUS_FIELDS, struct.unpack(STATUS_FORMAT, read_bytes))) - p_type = status["packetType"] - if p_type != 0x10: - raise MonsoonError("Package type %s is not 0x10." % p_type) - for k in status.keys(): - if k.endswith("VoltageSetting"): - status[k] = 2.0 + status[k] * 0.01 - elif k.endswith("FineCurrent"): - pass # needs calibration data - elif k.endswith("CoarseCurrent"): - pass # needs calibration data - elif k.startswith("voltage") or k.endswith("Voltage"): - status[k] = status[k] * 0.000125 - elif k.endswith("Resistor"): - status[k] = 0.05 + status[k] * 0.0001 - if k.startswith("aux") or k.startswith("defAux"): - status[k] += 0.05 - elif k.endswith("CurrentLimit"): - status[k] = 8 * (1023 - status[k]) / 1023.0 - return status - - def RampVoltage(self, start, end): - v = start - if v < 3.0: v = 3.0 # protocol doesn't support lower than this - while (v < end): - self.SetVoltage(v) - v += .1 - time.sleep(.1) - self.SetVoltage(end) - - def SetVoltage(self, v): - """Set the output voltage, 0 to disable. - """ - if v == 0: - self._SendStruct("BBB", 0x01, 0x01, 0x00) - else: - self._SendStruct("BBB", 0x01, 0x01, int((v - 2.0) * 100)) - - def GetVoltage(self): - """Get the output voltage. - - Returns: - Current Output Voltage (in unit of v). - """ - try: - return self.GetStatus()["outputVoltageSetting"] - # Catch potential errors such as struct.error, TypeError and other - # unknown errors which would bring down the whole test - except Exception as e: - raise MonsoonError("Error getting Monsoon voltage") - - def SetMaxCurrent(self, i): - """Set the max output current. - """ - if i < 0 or i > 8: - raise MonsoonError(("Target max current %sA, is out of acceptable " - "range [0, 8].") % i) - val = 1023 - int((i / 8) * 1023) - self._SendStruct("BBB", 0x01, 0x0a, val & 0xff) - self._SendStruct("BBB", 0x01, 0x0b, val >> 8) - - def SetMaxPowerUpCurrent(self, i): - """Set the max power up current. - """ - if i < 0 or i > 8: - raise MonsoonError(("Target max current %sA, is out of acceptable " - "range [0, 8].") % i) - val = 1023 - int((i / 8) * 1023) - self._SendStruct("BBB", 0x01, 0x08, val & 0xff) - self._SendStruct("BBB", 0x01, 0x09, val >> 8) - - def SetUsbPassthrough(self, val): - """Set the USB passthrough mode: 0 = off, 1 = on, 2 = auto. - """ - self._SendStruct("BBB", 0x01, 0x10, val) - - def GetUsbPassthrough(self): - """Get the USB passthrough mode: 0 = off, 1 = on, 2 = auto. - - Returns: - Current USB passthrough mode. - """ - try: - return self.GetStatus()["usbPassthroughMode"] - # Catch potential errors such as struct.error, TypeError and other - # unknown errors which would bring down the whole test - except Exception as e: - raise MonsoonError("Error reading Monsoon USB passthrough status") - - def StartDataCollection(self): - """Tell the device to start collecting and sending measurement data. - """ - self._SendStruct("BBB", 0x01, 0x1b, 0x01) # Mystery command - self._SendStruct("BBBBBBB", 0x02, 0xff, 0xff, 0xff, 0xff, 0x03, 0xe8) - - def StopDataCollection(self): - """Tell the device to stop collecting measurement data. - """ - self._SendStruct("BB", 0x03, 0x00) # stop - - def CollectData(self): - """Return some current samples. Call StartDataCollection() first. - """ - while 1: # loop until we get data or a timeout - _bytes = self._ReadPacket() - if not _bytes: - raise MonsoonError("Data collection failed due to empty data") - if len(_bytes) < 4 + 8 + 1 or _bytes[0] < 0x20 or _bytes[0] > 0x2F: - logging.warning("Wanted data, dropped type=0x%02x, len=%d", - _bytes[0], len(_bytes)) - continue - - seq, _type, x, y = struct.unpack("BBBB", _bytes[:4]) - data = [ - struct.unpack(">hhhh", _bytes[x:x + 8]) - for x in range(4, - len(_bytes) - 8, 8) - ] - - if self._last_seq and seq & 0xF != (self._last_seq + 1) & 0xF: - logging.warning("Data sequence skipped, lost packet?") - self._last_seq = seq - - if _type == 0: - if not self._coarse_scale or not self._fine_scale: - logging.warning( - "Waiting for calibration, dropped data packet.") - continue - out = [] - for main, usb, aux, voltage in data: - if main & 1: - coarse = ((main & ~1) - self._coarse_zero) - out.append(coarse * self._coarse_scale) - else: - out.append((main - self._fine_zero) * self._fine_scale) - return out - elif _type == 1: - self._fine_zero = data[0][0] - self._coarse_zero = data[1][0] - elif _type == 2: - self._fine_ref = data[0][0] - self._coarse_ref = data[1][0] - else: - logging.warning("Discarding data packet type=0x%02x", _type) - continue - - # See http://wiki/Main/MonsoonProtocol for details on these values. - if self._coarse_ref != self._coarse_zero: - self._coarse_scale = 2.88 / ( - self._coarse_ref - self._coarse_zero) - if self._fine_ref != self._fine_zero: - self._fine_scale = 0.0332 / (self._fine_ref - self._fine_zero) - - def _SendStruct(self, fmt, *args): - """Pack a struct (without length or checksum) and send it. - """ - # Flush out the input buffer before sending data - self._FlushInput() - data = struct.pack(fmt, *args) - data_len = len(data) + 1 - checksum = (data_len + sum(bytearray(data))) % 256 - out = struct.pack("B", data_len) + data + struct.pack("B", checksum) - self.ser.write(out) - - def _ReadPacket(self): - """Read a single data record as a string (without length or checksum). - """ - len_char = self.ser.read(1) - if not len_char: - raise MonsoonError("Reading from serial port timed out") - - data_len = ord(len_char) - if not data_len: - return "" - result = self.ser.read(int(data_len)) - result = bytearray(result) - if len(result) != data_len: - raise MonsoonError( - "Length mismatch, expected %d bytes, got %d bytes.", data_len, - len(result)) - body = result[:-1] - checksum = (sum(struct.unpack("B" * len(body), body)) + data_len) % 256 - if result[-1] != checksum: - raise MonsoonError( - "Invalid checksum from serial port! Expected %s, got %s", - hex(checksum), hex(result[-1])) - return result[:-1] - - def _FlushInput(self): - """ Flush all read data until no more available. """ - self.ser.reset_input_buffer() - flushed = 0 - while True: - ready_r, ready_w, ready_x = select.select([self.ser], [], - [self.ser], 0) - if len(ready_x) > 0: - raise MonsoonError("Exception from serial port.") - elif len(ready_r) > 0: - flushed += 1 - self.ser.read(1) # This may cause underlying buffering. - self.ser.reset_input_buffer( - ) # Flush the underlying buffer too. - else: - break - # if flushed > 0: - # logging.info("dropped >%d bytes" % flushed) - - -class MonsoonData(object): - """A class for reporting power measurement data from monsoon. - - Data means the measured current value in Amps. - """ - # Number of digits for long rounding. - lr = 8 - # Number of digits for short rounding - sr = 6 - # Delimiter for writing multiple MonsoonData objects to text file. - delimiter = "\n\n==========\n\n" - - def __init__(self, data_points, timestamps, hz, voltage, offset=0): - """Instantiates a MonsoonData object. - - Args: - data_points: A list of current values in Amp (float). - timestamps: A list of epoch timestamps (int). - hz: The hertz at which the data points are measured. - voltage: The voltage at which the data points are measured. - offset: The number of initial data points to discard - in calculations. - """ - self._data_points = data_points - self._timestamps = timestamps - self.offset = offset - num_of_data_pt = len(self._data_points) - if self.offset >= num_of_data_pt: - raise MonsoonError( - ("Offset number (%d) must be smaller than the " - "number of data points (%d).") % (offset, num_of_data_pt)) - self.data_points = self._data_points[self.offset:] - self.timestamps = self._timestamps[self.offset:] - self.hz = hz - self.voltage = voltage - self.tag = None - self._validate_data() - - @property - def average_current(self): - """Average current in the unit of mA. - """ - len_data_pt = len(self.data_points) - if len_data_pt == 0: - return 0 - cur = sum(self.data_points) * 1000 / len_data_pt - return round(cur, self.sr) - - @property - def total_charge(self): - """Total charged used in the unit of mAh. - """ - charge = (sum(self.data_points) / self.hz) * 1000 / 3600 - return round(charge, self.sr) - - @property - def total_power(self): - """Total power used. - """ - power = self.average_current * self.voltage - return round(power, self.sr) - - @staticmethod - def from_string(data_str): - """Creates a MonsoonData object from a string representation generated - by __str__. - - Args: - str: The string representation of a MonsoonData. - - Returns: - A MonsoonData object. - """ - lines = data_str.strip().split('\n') - err_msg = ("Invalid input string format. Is this string generated by " - "MonsoonData class?") - conditions = [ - len(lines) <= 4, "Average Current:" not in lines[1], - "Voltage: " not in lines[2], "Total Power: " not in lines[3], - "samples taken at " not in lines[4], - lines[5] != "Time" + ' ' * 7 + "Amp" - ] - if any(conditions): - raise MonsoonError(err_msg) - """Example string from Monsoon output file, first line is empty. - Line1: - Line2: test_2g_screenoff_dtimx2_marlin_OPD1.170706.006 - Line3: Average Current: 51.87984mA. - Line4: Voltage: 4.2V. - Line5: Total Power: 217.895328mW. - Line6: 150000 samples taken at 500Hz, with an offset of 0 samples. - """ - hz_str = lines[4].split()[4] - hz = int(hz_str[:-3]) - voltage_str = lines[2].split()[1] - voltage = float(voltage_str[:-2]) - lines = lines[6:] - t = [] - v = [] - for l in lines: - try: - timestamp, value = l.split(' ') - t.append(int(timestamp)) - v.append(float(value)) - except ValueError: - raise MonsoonError(err_msg) - return MonsoonData(v, t, hz, voltage) - - @staticmethod - def save_to_text_file(monsoon_data, file_path): - """Save multiple MonsoonData objects to a text file. - - Args: - monsoon_data: A list of MonsoonData objects to write to a text - file. - file_path: The full path of the file to save to, including the file - name. - """ - if not monsoon_data: - raise MonsoonError("Attempting to write empty Monsoon data to " - "file, abort") - utils.create_dir(os.path.dirname(file_path)) - with open(file_path, 'a') as f: - for md in monsoon_data: - f.write(str(md)) - f.write(MonsoonData.delimiter) - - @staticmethod - def from_text_file(file_path): - """Load MonsoonData objects from a text file generated by - MonsoonData.save_to_text_file. - - Args: - file_path: The full path of the file load from, including the file - name. - - Returns: - A list of MonsoonData objects. - """ - results = [] - with open(file_path, 'r') as f: - data_strs = f.read().split(MonsoonData.delimiter) - data_strs = data_strs[:-1] - for data_str in data_strs: - results.append(MonsoonData.from_string(data_str)) - return results - - def _validate_data(self): - """Verifies that the data points contained in the class are valid. - """ - msg = "Error! Expected {} timestamps, found {}.".format( - len(self._data_points), len(self._timestamps)) - if len(self._data_points) != len(self._timestamps): - raise MonsoonError(msg) - - def update_offset(self, new_offset): - """Updates how many data points to skip in caculations. - - Always use this function to update offset instead of directly setting - self.offset. - - Args: - new_offset: The new offset. - """ - self.offset = new_offset - self.data_points = self._data_points[self.offset:] - self.timestamps = self._timestamps[self.offset:] - - def get_data_with_timestamps(self): - """Returns the data points with timestamps. - - Returns: - A list of tuples in the format of (timestamp, data) - """ - result = [] - for t, d in zip(self.timestamps, self.data_points): - result.append(t, round(d, self.lr)) - return result - - def get_average_record(self, n): - """Returns a list of average current numbers, each representing the - average over the last n data points. - - Args: - n: Number of data points to average over. - - Returns: - A list of average current values. - """ - history_deque = collections.deque() - averages = [] - for d in self.data_points: - history_deque.appendleft(d) - if len(history_deque) > n: - history_deque.pop() - avg = sum(history_deque) / len(history_deque) - averages.append(round(avg, self.lr)) - return averages - - def _header(self): - strs = [""] - if self.tag: - strs.append(self.tag) + for serial in configs: + serial_number = int(serial) + if serial_number < 20000: + # This code assumes the LVPM has not been updated to have a + # non-stock firmware. If someone has updated the firmware, + # power measurement will fail. + objs.append(LvpmStockMonsoon(serial=serial_number)) else: - strs.append("Monsoon Measurement Data") - strs.append("Average Current: {}mA.".format(self.average_current)) - strs.append("Voltage: {}V.".format(self.voltage)) - strs.append("Total Power: {}mW.".format(self.total_power)) - strs.append( - ("{} samples taken at {}Hz, with an offset of {} samples.").format( - len(self._data_points), self.hz, self.offset)) - return "\n".join(strs) - - def __len__(self): - return len(self.data_points) - - def __str__(self): - strs = [] - strs.append(self._header()) - strs.append("Time" + ' ' * 7 + "Amp") - for t, d in zip(self.timestamps, self.data_points): - strs.append("{} {}".format(t, round(d, self.sr))) - return "\n".join(strs) - - def __repr__(self): - return self._header() - - -class Monsoon(object): - """The wrapper class for test scripts to interact with monsoon. - """ - - def __init__(self, *args, **kwargs): - serial = kwargs["serial"] - device = None - self.log = logging.getLogger() - if "device" in kwargs: - device = kwargs["device"] - self.mon = MonsoonProxy(serialno=serial, device=device) - self.dev = self.mon.ser.name - self.serial = serial - self.dut = None - - def attach_device(self, dut): - """Attach the controller object for the Device Under Test (DUT) - physically attached to the Monsoon box. - - Args: - dut: A controller object representing the device being powered by - this Monsoon box. - """ - self.dut = dut - - def set_voltage(self, volt, ramp=False): - """Sets the output voltage of monsoon. - - Args: - volt: Voltage to set the output to. - ramp: If true, the output voltage will be increased gradually to - prevent tripping Monsoon overvoltage. - """ - if ramp: - self.mon.RampVoltage(mon.start_voltage, volt) - else: - self.mon.SetVoltage(volt) - - def set_max_current(self, cur): - """Sets monsoon's max output current. - - Args: - cur: The max current in A. - """ - self.mon.SetMaxCurrent(cur) - - def set_max_init_current(self, cur): - """Sets the max power-up/inital current. - - Args: - cur: The max initial current allowed in mA. - """ - self.mon.SetMaxPowerUpCurrent(cur) - - @property - def status(self): - """Gets the status params of monsoon. - - Returns: - A dictionary where each key-value pair represents a monsoon status - param. - """ - return self.mon.GetStatus() - - def take_samples(self, sample_hz, sample_num, sample_offset=0, live=False): - """Take samples of the current value supplied by monsoon. - - This is the actual measurement for power consumption. This function - blocks until the number of samples requested has been fulfilled. - - Args: - hz: Number of points to take for every second. - sample_num: Number of samples to take. - offset: The number of initial data points to discard in MonsoonData - calculations. sample_num is extended by offset to compensate. - live: Print each sample in console as measurement goes on. - - Returns: - A MonsoonData object representing the data obtained in this - sampling. None if sampling is unsuccessful. - """ - sys.stdout.flush() - voltage = self.mon.GetVoltage() - self.log.info("Taking samples at %dhz for %ds, voltage %.2fv.", - sample_hz, (sample_num / sample_hz), voltage) - sample_num += sample_offset - # Make sure state is normal - self.mon.StopDataCollection() - status = self.mon.GetStatus() - native_hz = status["sampleRate"] * 1000 - - # Collect and average samples as specified - self.mon.StartDataCollection() - - # In case sample_hz doesn't divide native_hz exactly, use this - # invariant: 'offset' = (consumed samples) * sample_hz - - # (emitted samples) * native_hz - # This is the error accumulator in a variation of Bresenham's - # algorithm. - emitted = offset = 0 - collected = [] - # past n samples for rolling average - history_deque = collections.deque() - current_values = [] - timestamps = [] - - try: - last_flush = time.time() - while emitted < sample_num or sample_num == -1: - # The number of raw samples to consume before emitting the next - # output - need = int((native_hz - offset + sample_hz - 1) / sample_hz) - if need > len(collected): # still need more input samples - samples = self.mon.CollectData() - if not samples: - break - collected.extend(samples) - else: - # Have enough data, generate output samples. - # Adjust for consuming 'need' input samples. - offset += need * sample_hz - # maybe multiple, if sample_hz > native_hz - while offset >= native_hz: - # TODO(angli): Optimize "collected" operations. - this_sample = sum(collected[:need]) / need - this_time = int(time.time()) - timestamps.append(this_time) - if live: - self.log.info("%s %s", this_time, this_sample) - current_values.append(this_sample) - sys.stdout.flush() - offset -= native_hz - emitted += 1 # adjust for emitting 1 output sample - collected = collected[need:] - now = time.time() - if now - last_flush >= 0.99: # flush every second - sys.stdout.flush() - last_flush = now - except Exception as e: - pass - self.mon.StopDataCollection() - try: - return MonsoonData( - current_values, - timestamps, - sample_hz, - voltage, - offset=sample_offset) - except: - return None - - @utils.timeout(60) - def usb(self, state): - """Sets the monsoon's USB passthrough mode. This is specific to the - USB port in front of the monsoon box which connects to the powered - device, NOT the USB that is used to talk to the monsoon itself. - - "Off" means USB always off. - "On" means USB always on. - "Auto" means USB is automatically turned off when sampling is going on, - and turned back on when sampling finishes. - - Args: - stats: The state to set the USB passthrough to. - - Returns: - True if the state is legal and set. False otherwise. - """ - state_lookup = {"off": 0, "on": 1, "auto": 2} - state = state.lower() - if state in state_lookup: - current_state = self.mon.GetUsbPassthrough() - while (current_state != state_lookup[state]): - self.mon.SetUsbPassthrough(state_lookup[state]) - time.sleep(1) - current_state = self.mon.GetUsbPassthrough() - return True - return False - - def _check_dut(self): - """Verifies there is a DUT attached to the monsoon. - - This should be called in the functions that operate the DUT. - """ - if not self.dut: - raise MonsoonError("Need to attach the device before using it.") - - @utils.timeout(15) - def _wait_for_device(self, ad): - while ad.serial not in android_device.list_adb_devices(): - pass - ad.adb.wait_for_device() - - def execute_sequence_and_measure(self, - step_funcs, - hz, - duration, - offset_sec=20, - *args, - **kwargs): - """@Deprecated. - Executes a sequence of steps and take samples in-between. - - For each step function, the following steps are followed: - 1. The function is executed to put the android device in a state. - 2. If the function returns False, skip to next step function. - 3. If the function returns True, sl4a session is disconnected. - 4. Monsoon takes samples. - 5. Sl4a is reconnected. - - Because it takes some time for the device to calm down after the usb - connection is cut, an offset is set for each measurement. The default - is 20s. - - Args: - hz: Number of samples to take per second. - durations: Number(s) of minutes to take samples for in each step. - If this is an integer, all the steps will sample for the same - amount of time. If this is an iterable of the same length as - step_funcs, then each number represents the number of minutes - to take samples for after each step function. - e.g. If durations[0] is 10, we'll sample for 10 minutes after - step_funcs[0] is executed. - step_funcs: A list of funtions, whose first param is an android - device object. If a step function returns True, samples are - taken after this step, otherwise we move on to the next step - function. - ad: The android device object connected to this monsoon. - offset_sec: The number of seconds of initial data to discard. - *args, **kwargs: Extra args to be passed into each step functions. - - Returns: - The MonsoonData objects from samplings. - """ - self._check_dut() - sample_nums = [] - try: - if len(duration) != len(step_funcs): - raise MonsoonError(("The number of durations need to be the " - "same as the number of step functions.")) - for d in duration: - sample_nums.append(d * 60 * hz) - except TypeError: - num = duration * 60 * hz - sample_nums = [num] * len(step_funcs) - results = [] - oset = offset_sec * hz - for func, num in zip(step_funcs, sample_nums): - try: - self.usb("auto") - step_name = func.__name__ - self.log.info("Executing step function %s.", step_name) - take_sample = func(ad, *args, **kwargs) - if not take_sample: - self.log.info("Skip taking samples for %s", step_name) - continue - time.sleep(1) - self.dut.stop_services() - time.sleep(1) - self.log.info("Taking samples for %s.", step_name) - data = self.take_samples(hz, num, sample_offset=oset) - if not data: - raise MonsoonError("Sampling for %s failed." % step_name) - self.log.info("Sample summary: %s", repr(data)) - data.tag = step_name - results.append(data) - except Exception: - self.log.exception("Exception happened during step %s, abort!" - % func.__name__) - return results - finally: - self.mon.StopDataCollection() - self.usb("on") - self._wait_for_device(self.dut) - # Wait for device to come back online. - time.sleep(10) - self.dut.start_services() - # Release wake lock to put device into sleep. - self.dut.droid.goToSleepNow() - return results - - def disconnect_dut(self): - """Disconnect DUT from monsoon. - - Stop the sl4a service on the DUT and disconnect USB connection - raises: - MonsoonError: monsoon erro trying to disconnect usb - """ - try: - self.dut.stop_services() - time.sleep(1) - self.usb("off") - except Exception as e: - raise MonsoonError( - "Error happended trying to disconnect DUT from Monsoon") - - def monsoon_usb_auto(self): - """Set monsoon USB to auto to ready the device for power measurement. - - Stop the sl4a service on the DUT and disconnect USB connection - raises: - MonsoonError: monsoon erro trying to set usbpassthrough to auto - """ - try: - self.dut.stop_services() - time.sleep(1) - self.usb("auto") - except Exception as e: - raise MonsoonError( - "Error happended trying to set Monsoon usbpassthrough to auto") - - def reconnect_dut(self): - """Reconnect DUT to monsoon and start sl4a services. - - raises: - MonsoonError: monsoon erro trying to reconnect usb - Turn usbpassthrough on and start the sl4a services. - """ - self.log.info("Reconnecting dut.") - try: - # If wait for device failed, reset monsoon and try it again, if - # this still fails, then raise - try: - self._wait_for_device(self.dut) - except acts.utils.TimeoutError: - self.log.info('Retry-reset monsoon and connect again') - self.usb('off') - time.sleep(1) - self.usb('on') - self._wait_for_device(self.dut) - # Wait for device to come back online. - time.sleep(2) - self.dut.start_services() - # Release wake lock to put device into sleep. - self.dut.droid.goToSleepNow() - self.log.info("Dut reconnected.") - except Exception as e: - raise MonsoonError("Error happened trying to reconnect DUT") - - def measure_power(self, hz, duration, tag, offset=30): - """Measure power consumption of the attached device. - - Because it takes some time for the device to calm down after the usb - connection is cut, an offset is set for each measurement. The default - is 30s. The total time taken to measure will be (duration + offset). - - Args: - hz: Number of samples to take per second. - duration: Number of seconds to take samples for in each step. - offset: The number of seconds of initial data to discard. - tag: A string that's the name of the collected data group. - - Returns: - A MonsoonData object with the measured power data. - """ - num = duration * hz - oset = offset * hz - data = None - try: - data = self.take_samples(hz, num, sample_offset=oset) - if not data: - raise MonsoonError( - ("No data was collected in measurement %s.") % tag) - data.tag = tag - self.log.info("Measurement summary: %s", repr(data)) - finally: - self.mon.StopDataCollection() - return data + objs.append(HvpmMonsoon(serial=serial_number)) + return objs - def reconnect_monsoon(self): - """Reconnect Monsoon to serial port. - """ - logging.info("Close serial connection") - self.mon.ser.close() - logging.info("Reset serial port") - time.sleep(5) - logging.info("Open serial connection") - self.mon.ser.open() - self.mon.ser.reset_input_buffer() - self.mon.ser.reset_output_buffer() +def destroy(monsoons): + for monsoon in monsoons: + monsoon.release_monsoon_connection() diff --git a/acts/framework/acts/controllers/monsoon_lib/api/common.py b/acts/framework/acts/controllers/monsoon_lib/api/common.py index fffed4cbd3..f932535467 100644 --- a/acts/framework/acts/controllers/monsoon_lib/api/common.py +++ b/acts/framework/acts/controllers/monsoon_lib/api/common.py @@ -40,7 +40,37 @@ PASSTHROUGH_STATES = { } -class MonsoonData(object): +class MonsoonDataRecord(object): + """A data class for Monsoon data points.""" + def __init__(self, time, current): + """Creates a new MonsoonDataRecord. + + Args: + time: the string '{time}s', where time is measured in seconds since + the beginning of the data collection. + current: The current in Amperes as a string. + """ + self._time = float(time[:-1]) + self._current = float(current) + + @property + def time(self): + """The time the record was fetched.""" + return self._time + + @property + def current(self): + """The amount of current in Amperes measured for the given record.""" + return self._current + + @classmethod + def create_from_record_line(cls, line): + """Creates a data record from the line passed in from the output file. + """ + return cls(*line.split(' ')) + + +class MonsoonResult(object): """An object that contains aggregated data collected during sampling. Attributes: @@ -53,19 +83,48 @@ class MonsoonData(object): # The number of decimal places to round a value to. ROUND_TO = 6 - def __init__(self, num_samples, sum_currents, hz, voltage, tag=None): + def __init__(self, num_samples, sum_currents, hz, voltage, datafile_path): + """Creates a new MonsoonResult. + + Args: + num_samples: the number of samples collected. + sum_currents: the total summation of every current measurement. + hz: the number of samples per second. + voltage: the voltage used during the test. + datafile_path: the path to the monsoon data file. + """ self._num_samples = num_samples self._sum_currents = sum_currents self._hz = hz self._voltage = voltage - self.tag = tag + self.tag = datafile_path + + def get_data_points(self): + """Returns an iterator of MonsoonDataRecords.""" + class MonsoonDataIterator: + def __init__(self, file): + self.file = file + + def __iter__(self): + with open(self.file, 'r') as f: + for line in f: + # Remove the newline character. + line.strip() + yield MonsoonDataRecord.create_from_record_line(line) + + return MonsoonDataIterator(self.tag) + + @property + def num_samples(self): + """The number of samples recorded during the test.""" + return self._num_samples @property def average_current(self): """Average current in mA.""" - if self._num_samples == 0: + if self.num_samples == 0: return 0 - return round(self._sum_currents * 1000 / self._num_samples, + return round(self._sum_currents * 1000 / self.num_samples, self.ROUND_TO) @property @@ -79,8 +138,14 @@ class MonsoonData(object): """Total power used.""" return round(self.average_current * self._voltage, self.ROUND_TO) + @property + def voltage(self): + """The voltage during the measurement (in Volts).""" + return self._voltage + def __str__(self): return ('avg current: %s\n' 'total charge: %s\n' - 'total power: %s' % (self.average_current, self.total_charge, - self.total_power)) + 'total power: %s\n' + 'total samples: %s' % (self.average_current, self.total_charge, + self.total_power, self._num_samples)) diff --git a/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py index 6dc72c8f91..f1b03c9114 100644 --- a/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py +++ b/acts/framework/acts/controllers/monsoon_lib/api/hvpm/monsoon.py @@ -20,7 +20,7 @@ import time from Monsoon import HVPM from Monsoon import Operations as op -from acts.controllers.monsoon_lib.api.common import MonsoonData +from acts.controllers.monsoon_lib.api.common import MonsoonResult from acts.controllers.monsoon_lib.api.monsoon import BaseMonsoon from acts.controllers.monsoon_lib.sampling.engine.assembly_line import AssemblyLineBuilder from acts.controllers.monsoon_lib.sampling.engine.assembly_line import ThreadAssemblyLine @@ -137,8 +137,9 @@ class Monsoon(BaseMonsoon): manager.shutdown() self._mon.setup_usb(self.serial) - monsoon_data = MonsoonData(aggregator.num_samples, - aggregator.sum_currents, hz, voltage) + monsoon_data = MonsoonResult(aggregator.num_samples, + aggregator.sum_currents, hz, voltage, + output_path) self._log.info('Measurement summary:\n%s', str(monsoon_data)) return monsoon_data diff --git a/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py index b54b6794fc..e8d116d83f 100644 --- a/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py +++ b/acts/framework/acts/controllers/monsoon_lib/api/lvpm_stock/monsoon.py @@ -16,7 +16,7 @@ import multiprocessing import time -from acts.controllers.monsoon_lib.api.common import MonsoonData +from acts.controllers.monsoon_lib.api.common import MonsoonResult from acts.controllers.monsoon_lib.api.lvpm_stock.monsoon_proxy import MonsoonProxy from acts.controllers.monsoon_lib.api.monsoon import BaseMonsoon from acts.controllers.monsoon_lib.sampling.engine.assembly_line import AssemblyLineBuilder @@ -119,8 +119,9 @@ class Monsoon(BaseMonsoon): manager.shutdown() - monsoon_data = MonsoonData(aggregator.num_samples, - aggregator.sum_currents, hz, voltage) + monsoon_data = MonsoonResult(aggregator.num_samples, + aggregator.sum_currents, hz, voltage, + output_path) self._log.info('Measurement summary:\n%s', str(monsoon_data)) return monsoon_data diff --git a/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py b/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py index 4916da2f15..a55fc8a0e0 100644 --- a/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py +++ b/acts/framework/acts/controllers/monsoon_lib/api/monsoon.py @@ -160,6 +160,8 @@ class BaseMonsoon(object): """Deprecated. Use the connection callbacks instead.""" def on_reconnect(): + # Make sure the device is connected and available for commands. + android_device.wait_for_boot_completion() android_device.start_services() # Release wake lock to put device into sleep. android_device.droid.goToSleepNow() @@ -180,15 +182,6 @@ class BaseMonsoon(object): """Sets the callback to be called when Monsoon reconnects USB.""" self.on_reconnect = callback - def monsoon_usb_auto(self): - """Deprecated. Use usb('auto') or usb(PassthroughStates.AUTO) instead. - """ - self.usb(PassthroughStates.AUTO) - - def reconnect_dut(self): - """Deprecated. Use usb('on') or usb(PassthroughStates.ON) instead.""" - self.usb(PassthroughStates.ON) - def take_samples(self, assembly_line): """Runs the sampling procedure based on the given assembly line.""" # Sampling is always done in a separate process. Release the Monsoon diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py index 24c5a6ddbb..0163d16517 100644 --- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py +++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500.py @@ -73,7 +73,6 @@ class TransmissionModes(Enum): TM2 = 'TM2' TM3 = 'TM3' TM4 = 'TM4' - TM6 = 'TM6' TM7 = 'TM7' TM8 = 'TM8' TM9 = 'TM9' @@ -302,7 +301,7 @@ class Cmw500(abstract_inst.SocketInstrument): return self.send_and_recv('*IDN?') def disconnect(self): - """Detach controller from device and switch to local mode.""" + """Disconnect controller from device and switch to local mode.""" self.switch_lte_signalling(LteState.LTE_OFF) self.close_remote_mode() self._close_socket() @@ -311,6 +310,10 @@ class Cmw500(abstract_inst.SocketInstrument): """Exits remote mode to local mode.""" self.send_and_recv('>L') + def detach(self): + """Detach callbox and controller.""" + self.send_and_recv('CALL:LTE:SIGN:PSWitched:ACTion DETach') + @property def rrc_connection(self): """Gets the RRC connection state.""" @@ -747,6 +750,8 @@ class BaseStation(object): Args: num_antenna: Count of number of dl antennas to use. """ + if not isinstance(num_antenna, MimoModes): + raise ValueError('num_antenna should be an instance of MimoModes.') cmd = 'CONFigure:LTE:SIGN:CONNection:{}:NENBantennas {}'.format( self._bts, num_antenna) self._cmw.send_and_recv(cmd) diff --git a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py index 65e66f9f84..64d127b808 100644 --- a/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py +++ b/acts/framework/acts/controllers/rohdeschwarz_lib/cmw500_cellular_simulator.py @@ -17,6 +17,27 @@ import time from acts.controllers.rohdeschwarz_lib import cmw500 from acts.controllers import cellular_simulator as cc +from acts.test_utils.power.tel_simulations import LteSimulation + +CMW_TM_MAPPING = { + LteSimulation.TransmissionMode.TM1: cmw500.TransmissionModes.TM1, + LteSimulation.TransmissionMode.TM2: cmw500.TransmissionModes.TM2, + LteSimulation.TransmissionMode.TM3: cmw500.TransmissionModes.TM3, + LteSimulation.TransmissionMode.TM4: cmw500.TransmissionModes.TM4, + LteSimulation.TransmissionMode.TM7: cmw500.TransmissionModes.TM7, + LteSimulation.TransmissionMode.TM8: cmw500.TransmissionModes.TM8, + LteSimulation.TransmissionMode.TM9: cmw500.TransmissionModes.TM9 +} + +CMW_SCH_MAPPING = { + LteSimulation.SchedulingMode.STATIC: cmw500.SchedulingMode.USERDEFINEDCH +} + +CMW_MIMO_MAPPING = { + LteSimulation.MimoMode.MIMO_1x1: cmw500.MimoModes.MIMO1x1, + LteSimulation.MimoMode.MIMO_2x2: cmw500.MimoModes.MIMO2x2, + LteSimulation.MimoMode.MIMO_4x4: cmw500.MimoModes.MIMO4x4 +} class CMW500CellularSimulator(cc.AbstractCellularSimulator): @@ -57,7 +78,6 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): """ Sends finalization commands to the cellular equipment and closes the connection. """ self.cmw.disconnect() - self.cmw.close_remote_mode() def setup_lte_scenario(self): """ Configures the equipment for an LTE simulation. """ @@ -68,6 +88,18 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): """ Configures the equipment for an LTE with CA simulation. """ raise NotImplementedError() + def set_lte_rrc_state_change_timer(self, enabled, time=10): + """ Configures the LTE RRC state change timer. + + Args: + enabled: a boolean indicating if the timer should be on or off. + time: time in seconds for the timer to expire + """ + # Setting this method to pass instead of raising an exception as it + # it is required by LTE sims. + # TODO (b/141838145): Implement RRC status change timer for CMW500. + pass + def set_band(self, bts_index, band): """ Sets the band for the indicated base station. @@ -111,7 +143,8 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): bts_index: the base station number output_power: the new output power """ - raise NotImplementedError() + bts = self.bts[bts_index] + bts.downlink_power_level = output_power def set_tdd_config(self, bts_index, tdd_config): """ Sets the tdd configuration number for the indicated base station. @@ -166,7 +199,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): mimo_mode: the new mimo mode """ bts = self.bts[bts_index] - + mimo_mode = CMW_MIMO_MAPPING[mimo_mode] if mimo_mode == cmw500.MimoModes.MIMO1x1: self.cmw.configure_mimo_settings(cmw500.MimoScenario.SCEN1x1) bts.dl_antenna = cmw500.MimoModes.MIMO1x1 @@ -190,6 +223,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): """ bts = self.bts[bts_index] + tmode = CMW_TM_MAPPING[tmode] if (tmode in [ cmw500.TransmissionModes.TM1, cmw500.TransmissionModes.TM7 @@ -224,7 +258,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): nrb_ul: Number of RBs for uplink. """ bts = self.bts[bts_index] - bts.scheduling_mode = scheduling + bts.scheduling_mode = CMW_SCH_MAPPING[scheduling] if not self.ul_modulation and self.dl_modulation: raise ValueError('Modulation should be set prior to scheduling ' @@ -305,7 +339,8 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): bts_index: the base station number tbs_pattern_on: the new TBS pattern setting """ - raise NotImplementedError() + # TODO (b/143918664): CMW500 doesn't have an equivalent setting. + pass def lte_attach_secondary_carriers(self): """ Activates the secondary carriers for CA. Requires the DUT to be @@ -341,7 +376,7 @@ class CMW500CellularSimulator(cc.AbstractCellularSimulator): def detach(self): """ Turns off all the base stations so the DUT loose connection.""" - self.cmw.disconnect() + self.cmw.detach() def stop(self): """ Stops current simulation. After calling this method, the simulator diff --git a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py index 1f13d26499..bb63bd9934 100644 --- a/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py +++ b/acts/framework/acts/test_utils/abstract_devices/bluetooth_handsfree_abstract_device.py @@ -14,8 +14,11 @@ # License for the specific language governing permissions and limitations under # the License. import inspect - +import time +from acts import asserts from acts.controllers.buds_lib.dev_utils import apollo_sink_events +from acts.test_utils.bt.bt_constants import bt_default_timeout + def validate_controller(controller, abstract_device_class): @@ -257,7 +260,18 @@ class AndroidHeadsetBluetoothHandsfreeAbstractDevice( @property def mac_address(self): - return self.ad_controller.droid.bluetoothGetLocalAddress() + """Getting device mac with more stability ensurance. + + Sometime, getting mac address is flaky that it returns None. Adding a + loop to add more ensurance of getting correct mac address. + """ + device_mac = None + start_time = time.time() + end_time = start_time + bt_default_timeout + while not device_mac and time.time() < end_time: + device_mac = self.ad_controller.droid.bluetoothGetLocalAddress() + asserts.assert_true(device_mac, 'Can not get the MAC address') + return device_mac def accept_call(self): return self.ad_controller.droid.telecomAcceptRingingCall(None) diff --git a/acts/framework/acts/test_utils/bt/bt_power_test_utils.py b/acts/framework/acts/test_utils/bt/bt_power_test_utils.py index c9c60723e9..628c3375d1 100644 --- a/acts/framework/acts/test_utils/bt/bt_power_test_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_power_test_utils.py @@ -14,93 +14,144 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import time +import acts.test_utils.bt.BleEnum as bleenum +import acts.test_utils.instrumentation.instrumentation_command_builder as icb -from acts.test_utils.wifi import wifi_power_test_utils as wputils -from acts.test_utils.bt.bt_test_utils import enable_bluetooth -from acts.test_utils.bt.bt_test_utils import disable_bluetooth - -BT_BASE_UUID = '00000000-0000-1000-8000-00805F9B34FB' -BT_CLASSICAL_DATA = [1, 2, 3] BLE_LOCATION_SCAN_ENABLE = 'settings put global ble_scan_always_enabled 1' BLE_LOCATION_SCAN_DISABLE = 'settings put global ble_scan_always_enabled 0' -START_PMC_CMD = 'am start -n com.android.pmc/com.android.pmc.PMCMainActivity' -PMC_VERBOSE_CMD = 'setprop log.tag.PMC VERBOSE' -PMC_BASE_SCAN = 'am broadcast -a com.android.pmc.BLESCAN --es ScanMode ' +SCREEN_WAIT_TIME = 1 -def phone_setup_for_BT(dut, bt_on, ble_on, screen_status): - """Sets the phone and Bluetooth in the desired state +class MediaControl(object): + """Media control using adb shell for power testing. - Args: - dut: object of the android device under test - bt_on: Enable/Disable BT - ble_on: Enable/Disable BLE - screen_status: screen ON or OFF + Object to control media play status using adb. """ - # Initialize the dut to rock-bottom state - wputils.dut_rockbottom(dut) - time.sleep(2) - - # Check if we are enabling a background scan - # TODO: Turn OFF cellular wihtout having to turn ON airplane mode - if bt_on == 'OFF' and ble_on == 'ON': - dut.adb.shell(BLE_LOCATION_SCAN_ENABLE) - dut.droid.connectivityToggleAirplaneMode(False) - time.sleep(2) - - # Turn ON/OFF BT - if bt_on == 'ON': - enable_bluetooth(dut.droid, dut.ed) - dut.log.info('BT is ON') - else: - disable_bluetooth(dut.droid) - dut.droid.bluetoothDisableBLE() - dut.log.info('BT is OFF') - time.sleep(2) - - # Turn ON/OFF BLE - if ble_on == 'ON': - dut.droid.bluetoothEnableBLE() - dut.log.info('BLE is ON') - else: - dut.droid.bluetoothDisableBLE() - dut.log.info('BLE is OFF') - time.sleep(2) - - # Set the desired screen status - if screen_status == 'OFF': - dut.droid.goToSleepNow() - dut.log.info('Screen is OFF') - time.sleep(2) - - -def start_pmc_ble_scan(dut, - scan_mode, - offset_start, - scan_time, - idle_time=None, - num_reps=1): - """Starts a generic BLE scan via the PMC app + + def __init__(self, android_device, music_file): + """Initialize the media_control class. + + Args: + android_dut: android_device object + music_file: location of the music file + """ + self.android_device = android_device + self.music_file = music_file + + def player_on_foreground(self): + """Turn on screen and make sure media play is on foreground + + All media control keycode only works when screen is on and media player + is on the foreground. Turn off screen first and turn it on to make sure + all operation is based on the same screen status. Otherwise, 'MENU' key + would block command to be sent. + """ + self.android_device.droid.goToSleepNow() + time.sleep(SCREEN_WAIT_TIME) + self.android_device.droid.wakeUpNow() + time.sleep(SCREEN_WAIT_TIME) + self.android_device.send_keycode('MENU') + time.sleep(SCREEN_WAIT_TIME) + + def play(self): + """Start playing music. + + """ + self.player_on_foreground() + PLAY = 'am start -a android.intent.action.VIEW -d file://{} -t audio/wav'.format( + self.music_file) + self.android_device.adb.shell(PLAY) + + def pause(self): + """Pause music. + + """ + self.player_on_foreground() + self.android_device.send_keycode('MEDIA_PAUSE') + + def resume(self): + """Pause music. + + """ + self.player_on_foreground() + self.android_device.send_keycode('MEDIA_PLAY') + + def stop(self): + """Stop music and close media play. + + """ + self.player_on_foreground() + self.android_device.send_keycode('MEDIA_STOP') + + +def start_apk_ble_adv(dut, adv_mode, adv_power_level, adv_duration): + """Trigger BLE advertisement from power-test.apk. Args: - dut: object of the android device under test - scan mode: desired BLE scan type - offset_start: Time delay in seconds before scan starts - scan_time: active scan time - idle_time: iddle time (i.e., no scans occuring) - num_reps: Number of repetions of the ative+idle scan sequence + dut: Android device under test, type AndroidDevice obj + adv_mode: The BLE advertisement mode. + {0: 'LowPower', 1: 'Balanced', 2: 'LowLatency'} + adv_power_leve: The BLE advertisement TX power level. + {0: 'UltraLowTXPower', 1: 'LowTXPower', 2: 'MediumTXPower, + 3: HighTXPower} + adv_duration: duration of advertisement in seconds, type int """ - scan_dur = scan_time - if not idle_time: - idle_time = 0.2 * scan_time - scan_dur = 0.8 * scan_time - - first_part_msg = '%s%s --es StartTime %d --es ScanTime %d' % ( - PMC_BASE_SCAN, scan_mode, offset_start, scan_dur) - msg = '%s --es NoScanTime %d --es Repetitions %d' % (first_part_msg, - idle_time, num_reps) + adv_duration = str(adv_duration) + 's' + builder = icb.InstrumentationTestCommandBuilder.default() + builder.add_test_class( + "com.google.android.device.power.tests.ble.BleAdvertise") + builder.set_manifest_package("com.google.android.device.power") + builder.set_runner("androidx.test.runner.AndroidJUnitRunner") + builder.add_key_value_param("cool-off-duration", "0s") + builder.add_key_value_param("idle-duration", "0s") + builder.add_key_value_param( + "com.android.test.power.receiver.ADVERTISE_MODE", adv_mode) + builder.add_key_value_param("com.android.test.power.receiver.POWER_LEVEL", + adv_power_level) + builder.add_key_value_param( + "com.android.test.power.receiver.ADVERTISING_DURATION", adv_duration) + + adv_command = builder.build() + ' &' + logging.info('Start BLE {} at {} for {} seconds'.format( + bleenum.AdvertiseSettingsAdvertiseMode(adv_mode).name, + bleenum.AdvertiseSettingsAdvertiseTxPower(adv_power_level).name, + adv_duration)) + dut.adb.shell_nb(adv_command) + + +def start_apk_ble_scan(dut, scan_mode, scan_duration): + """Build the command to trigger BLE scan from power-test.apk. - dut.log.info('Sent BLE scan broadcast message: %s', msg) - dut.adb.shell(msg) + Args: + dut: Android device under test, type AndroidDevice obj + scan_mode: The BLE scan mode. + {0: 'LowPower', 1: 'Balanced', 2: 'LowLatency', -1: 'Opportunistic'} + scan_duration: duration of scan in seconds, type int + Returns: + adv_command: the command for BLE scan + """ + scan_duration = str(scan_duration) + 's' + builder = icb.InstrumentationTestCommandBuilder.default() + builder.add_test_class("com.google.android.device.power.tests.ble.BleScan") + builder.set_manifest_package("com.google.android.device.power") + builder.set_runner("androidx.test.runner.AndroidJUnitRunner") + builder.add_key_value_param("cool-off-duration", "0s") + builder.add_key_value_param("idle-duration", "0s") + builder.add_key_value_param("com.android.test.power.receiver.SCAN_MODE", + scan_mode) + builder.add_key_value_param("com.android.test.power.receiver.MATCH_MODE", + 2) + builder.add_key_value_param( + "com.android.test.power.receiver.SCAN_DURATION", scan_duration) + builder.add_key_value_param( + "com.android.test.power.receiver.CALLBACK_TYPE", 1) + builder.add_key_value_param("com.android.test.power.receiver.FILTER", + 'true') + + scan_command = builder.build() + ' &' + logging.info('Start BLE {} scans for {} seconds'.format( + bleenum.ScanSettingsScanMode(scan_mode).name, scan_duration)) + dut.adb.shell_nb(scan_command) diff --git a/acts/framework/acts/test_utils/bt/bt_test_utils.py b/acts/framework/acts/test_utils/bt/bt_test_utils.py index f13002e589..da4a88089f 100644 --- a/acts/framework/acts/test_utils/bt/bt_test_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_test_utils.py @@ -79,7 +79,8 @@ def _add_android_device_to_dictionary(android_device, profile_list, profile_list: The list of profiles the Android device supports. """ for profile in profile_list: - if profile in selector_dict and android_device not in selector_dict[profile]: + if profile in selector_dict and android_device not in selector_dict[ + profile]: selector_dict[profile].append(android_device) else: selector_dict[profile] = [android_device] @@ -164,7 +165,7 @@ def cleanup_scanners_and_advertisers(scn_android_device, scn_callback_list, except Exception as err: adv_android_device.log.debug( "Failed to stop LE advertisement... reseting Bluetooth. Error {}". - format(err)) + format(err)) reset_bluetooth([adv_android_device]) @@ -197,7 +198,9 @@ def clear_bonded_devices(ad): return True -def connect_phone_to_headset(android, headset, timeout=bt_default_timeout, +def connect_phone_to_headset(android, + headset, + timeout=bt_default_timeout, connection_check_period=10): """Connects android phone to bluetooth headset. Headset object must have methods power_on and enter_pairing_mode, @@ -214,7 +217,8 @@ def connect_phone_to_headset(android, headset, timeout=bt_default_timeout, connected (bool): True if devices are paired and connected by end of method. False otherwise. """ - connected = is_a2dp_src_device_connected(android, headset.mac_address) + headset_mac_address = headset.mac_address + connected = is_a2dp_src_device_connected(android, headset_mac_address) log.info('Devices connected before pair attempt: %s' % connected) if not connected: # Turn on headset and initiate pairing mode. @@ -224,16 +228,18 @@ def connect_phone_to_headset(android, headset, timeout=bt_default_timeout, # If already connected, skip pair and connect attempt. while not connected and (time.time() - start_time < timeout): bonded_info = android.droid.bluetoothGetBondedDevices() - if headset.mac_address not in [info["address"] for info in bonded_info]: + if headset.mac_address not in [ + info["address"] for info in bonded_info + ]: # Use SL4A to pair and connect with headset. headset.enter_pairing_mode() - android.droid.bluetoothDiscoverAndBond(headset.mac_address) + android.droid.bluetoothDiscoverAndBond(headset_mac_address) else: # Device is bonded but not connected - android.droid.bluetoothConnectBonded(headset.mac_address) + android.droid.bluetoothConnectBonded(headset_mac_address) log.info('Waiting for connection...') time.sleep(connection_check_period) # Check for connection. - connected = is_a2dp_src_device_connected(android, headset.mac_address) + connected = is_a2dp_src_device_connected(android, headset_mac_address) log.info('Devices connected after pair attempt: %s' % connected) return connected @@ -394,12 +400,13 @@ def determine_max_advertisements(android_device): advertise_callback = android_device.droid.bleGenBleAdvertiseCallback() advertise_callback_list.append(advertise_callback) - android_device.droid.bleStartBleAdvertising( - advertise_callback, advertise_data, advertise_settings) + android_device.droid.bleStartBleAdvertising(advertise_callback, + advertise_data, + advertise_settings) regex = "(" + adv_succ.format( advertise_callback) + "|" + adv_fail.format( - advertise_callback) + ")" + advertise_callback) + ")" # wait for either success or failure event evt = android_device.ed.pop_events(regex, bt_default_timeout, small_timeout) @@ -585,10 +592,9 @@ def generate_ble_scan_objects(droid): return filter_list, scan_settings, scan_callback -def generate_id_by_size( - size, - chars=( - string.ascii_lowercase + string.ascii_uppercase + string.digits)): +def generate_id_by_size(size, + chars=(string.ascii_lowercase + + string.ascii_uppercase + string.digits)): """Generate random ascii characters of input size and input char types Args: @@ -650,6 +656,122 @@ def get_bluetooth_crash_count(android_device): return int(re.search("crashed(.*\d)", out).group(1)) +def get_bt_metric(ad_list, duration=1, tag="bt_metric", processed=True): + """ Function to get the bt metric from logcat. + + Captures logcat for the specified duration and returns the bqr results. + Takes list of android objects as input. If a single android object is given, + converts it into a list. + + Args: + ad_list: list of android_device objects + duration: time duration (seconds) for which the logcat is parsed. + tag: tag to be appended to the logcat dump. + processed: flag to process bqr output. + + Returns: + metrics_dict: dict of metrics for each android device. + """ + + # Defining bqr quantitites and their regex to extract + regex_dict = {"pwlv": "PwLv:\s(\S+)", "rssi": "RSSI:\s[-](\d+)"} + metrics_dict = {"rssi": {}, "pwlv": {}} + + # Converting a single android device object to list + if not isinstance(ad_list, list): + ad_list = [ad_list] + + #Time sync with the test machine + for ad in ad_list: + ad.droid.setTime(int(round(time.time() * 1000))) + time.sleep(0.5) + + begin_time = utils.get_current_epoch_time() + time.sleep(duration) + end_time = utils.get_current_epoch_time() + + for ad in ad_list: + bt_rssi_log = ad.cat_adb_log(tag, begin_time, end_time) + bqr_tag = "Monitoring , Handle:" + + # Extracting supporting bqr quantities + for metric, regex in regex_dict.items(): + bqr_metric = [] + file_bt_log = open(bt_rssi_log, "r") + for line in file_bt_log: + if bqr_tag in line: + if re.findall(regex, line): + m = re.findall(regex, line)[0].strip(",") + bqr_metric.append(m) + metrics_dict[metric][ad.serial] = bqr_metric + + # Formatting the raw data + metrics_dict["rssi"][ad.serial] = [ + (-1) * int(x) for x in metrics_dict["rssi"][ad.serial] + ] + metrics_dict["pwlv"][ad.serial] = [ + int(x, 16) for x in metrics_dict["pwlv"][ad.serial] + ] + + # Processing formatted data if processing is required + if processed: + # Computes the average RSSI + metrics_dict["rssi"][ad.serial] = round( + sum(metrics_dict["rssi"][ad.serial]) / + len(metrics_dict["rssi"][ad.serial]), 2) + # Returns last noted value for power level + metrics_dict["pwlv"][ad.serial] = metrics_dict["pwlv"][ + ad.serial][-1] + + return metrics_dict + + +def get_bt_rssi(ad, duration=1, processed=True): + """Function to get average bt rssi from logcat. + + This function returns the average RSSI for the given duration. RSSI values are + extracted from BQR. + + Args: + ad: (list of) android_device object. + duration: time duration(seconds) for which logcat is parsed. + + Returns: + avg_rssi: average RSSI on each android device for the given duration. + """ + function_tag = "get_bt_rssi" + bqr_results = get_bt_metric(ad, + duration, + tag=function_tag, + processed=processed) + return bqr_results["rssi"] + + +def enable_bqr(ad_list, bqr_interval=10, bqr_event_mask=15,): + """Sets up BQR reporting. + + Sets up BQR to report BT metrics at the requested frequency and toggles + airplane mode for the bqr settings to take effect. + + Args: + ad_list: an android_device or list of android devices. + """ + # Converting a single android device object to list + if not isinstance(ad_list, list): + ad_list = [ad_list] + + for ad in ad_list: + #Setting BQR parameters + ad.adb.shell("setprop persist.bluetooth.bqr.event_mask {}".format( + bqr_event_mask)) + ad.adb.shell("setprop persist.bluetooth.bqr.min_interval_ms {}".format( + bqr_interval)) + + ## Toggle airplane mode + ad.droid.connectivityToggleAirplaneMode(True) + ad.droid.connectivityToggleAirplaneMode(False) + + def get_device_selector_dictionary(android_device_list): """Create a dictionary of Bluetooth features vs Android devices. @@ -733,8 +855,8 @@ def get_mac_address_of_generic_advertisement(scan_ad, adv_ad): adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) try: - adv_ad.ed.pop_event( - adv_succ.format(advertise_callback), bt_default_timeout) + adv_ad.ed.pop_event(adv_succ.format(advertise_callback), + bt_default_timeout) except Empty as err: raise BtTestUtilsError( "Advertiser did not start successfully {}".format(err)) @@ -973,8 +1095,8 @@ def orchestrate_bluetooth_socket_connection( client_ad.droid.bluetoothStartPairingHelper() server_ad.droid.bluetoothSocketConnBeginAcceptThreadUuid( - (bluetooth_socket_conn_test_uuid - if uuid is None else uuid), accept_timeout_ms) + (bluetooth_socket_conn_test_uuid if uuid is None else uuid), + accept_timeout_ms) client_ad.droid.bluetoothSocketConnBeginConnectThreadUuid( server_ad.droid.bluetoothGetLocalAddress(), (bluetooth_socket_conn_test_uuid if uuid is None else uuid)) @@ -1038,12 +1160,14 @@ def pair_pri_to_sec(pri_ad, sec_ad, attempts=2, auto_confirm=True): # Wait 2 seconds before unbound time.sleep(2) if not clear_bonded_devices(pri_ad): - log.error("Failed to clear bond for primary device at attempt {}" - .format(str(curr_attempts))) + log.error( + "Failed to clear bond for primary device at attempt {}".format( + str(curr_attempts))) return False if not clear_bonded_devices(sec_ad): - log.error("Failed to clear bond for secondary device at attempt {}" - .format(str(curr_attempts))) + log.error( + "Failed to clear bond for secondary device at attempt {}". + format(str(curr_attempts))) return False # Wait 2 seconds after unbound time.sleep(2) @@ -1141,8 +1265,8 @@ def scan_and_verify_n_advertisements(scn_ad, max_advertisements): while (start_time + bt_default_timeout) > time.time(): event = None try: - event = scn_ad.ed.pop_event( - scan_result.format(scan_callback), bt_default_timeout) + event = scn_ad.ed.pop_event(scan_result.format(scan_callback), + bt_default_timeout) except Empty as error: raise BtTestUtilsError( "Failed to find scan event: {}".format(error)) @@ -1156,13 +1280,12 @@ def scan_and_verify_n_advertisements(scn_ad, max_advertisements): return test_result -def set_bluetooth_codec( - android_device, - codec_type, - sample_rate, - bits_per_sample, - channel_mode, - codec_specific_1=0): +def set_bluetooth_codec(android_device, + codec_type, + sample_rate, + bits_per_sample, + channel_mode, + codec_specific_1=0): """Sets the A2DP codec configuration on the AndroidDevice. Args: @@ -1181,31 +1304,26 @@ def set_bluetooth_codec( bool: True if the codec config was successfully changed to the desired values. Else False. """ - message = ( - "Set Android Device A2DP Bluetooth codec configuration:\n" - "\tCodec: {codec_type}\n" - "\tSample Rate: {sample_rate}\n" - "\tBits per Sample: {bits_per_sample}\n" - "\tChannel Mode: {channel_mode}".format( - codec_type=codec_type, - sample_rate=sample_rate, - bits_per_sample=bits_per_sample, - channel_mode=channel_mode - ) - ) + message = ("Set Android Device A2DP Bluetooth codec configuration:\n" + "\tCodec: {codec_type}\n" + "\tSample Rate: {sample_rate}\n" + "\tBits per Sample: {bits_per_sample}\n" + "\tChannel Mode: {channel_mode}".format( + codec_type=codec_type, + sample_rate=sample_rate, + bits_per_sample=bits_per_sample, + channel_mode=channel_mode)) android_device.log.info(message) # Send SL4A command droid, ed = android_device.droid, android_device.ed if not droid.bluetoothA2dpSetCodecConfigPreference( - codec_types[codec_type], - sample_rates[str(sample_rate)], - bits_per_samples[str(bits_per_sample)], - channel_modes[channel_mode], - codec_specific_1 - ): - android_device.log.warning("SL4A command returned False. Codec was not " - "changed.") + codec_types[codec_type], sample_rates[str(sample_rate)], + bits_per_samples[str(bits_per_sample)], + channel_modes[channel_mode], codec_specific_1): + android_device.log.warning( + "SL4A command returned False. Codec was not " + "changed.") else: try: ed.pop_event(bluetooth_a2dp_codec_config_changed, @@ -1223,14 +1341,10 @@ def set_bluetooth_codec( android_device.log.warning("Could not verify codec config change " "through ADB.") elif split_out[1].strip().upper() != codec_type: - android_device.log.error( - "Codec config was not changed.\n" - "\tExpected codec: {exp}\n" - "\tActual codec: {act}".format( - exp=codec_type, - act=split_out[1].strip() - ) - ) + android_device.log.error("Codec config was not changed.\n" + "\tExpected codec: {exp}\n" + "\tActual codec: {act}".format( + exp=codec_type, act=split_out[1].strip())) return False android_device.log.info("Bluetooth codec successfully changed.") return True @@ -1336,8 +1450,8 @@ def setup_multiple_devices_for_bt_test(android_devices): threads = [] try: for a in android_devices: - thread = threading.Thread( - target=factory_reset_bluetooth, args=([[a]])) + thread = threading.Thread(target=factory_reset_bluetooth, + args=([[a]])) threads.append(thread) thread.start() for t in threads: @@ -1391,8 +1505,8 @@ def setup_n_advertisements(adv_ad, num_advertisements): adv_ad.droid.bleStartBleAdvertising(advertise_callback, advertise_data, advertise_settings) try: - adv_ad.ed.pop_event( - adv_succ.format(advertise_callback), bt_default_timeout) + adv_ad.ed.pop_event(adv_succ.format(advertise_callback), + bt_default_timeout) adv_ad.log.info("Advertisement {} started.".format(i + 1)) except Empty as error: adv_ad.log.error("Advertisement {} failed to start.".format(i + 1)) @@ -1543,8 +1657,9 @@ def _wait_for_passkey_match(pri_ad, sec_ad): timeout=bt_default_timeout) sec_variant = sec_pairing_req["data"]["PairingVariant"] sec_pin = sec_pairing_req["data"]["Pin"] - sec_ad.log.info("Secondary device received Pin: {}, Variant: {}" - .format(sec_pin, sec_variant)) + sec_ad.log.info( + "Secondary device received Pin: {}, Variant: {}".format( + sec_pin, sec_variant)) except Empty as err: log.error("Wait for pin error: {}".format(err)) log.error("Pairing request state, Primary: {}, Secondary: {}".format( diff --git a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py index dba308c5e2..641c8e5ad8 100644 --- a/acts/framework/acts/test_utils/gnss/gnss_test_utils.py +++ b/acts/framework/acts/test_utils/gnss/gnss_test_utils.py @@ -84,9 +84,6 @@ def reboot(ad): if not int(ad.adb.shell("settings get global mobile_data")) == 1: set_mobile_data(ad, True) utils.sync_device_time(ad) - if ad.model == "sailfish" or ad.model == "marlin": - remount_device(ad) - ad.adb.shell("echo at@test=8 >> /dev/at_mdm0") def enable_gnss_verbose_logging(ad): """Enable GNSS VERBOSE Logging and persistent logcat. @@ -505,22 +502,29 @@ def clear_aiding_data_by_gtw_gpstool(ad): ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode clear") time.sleep(10) -def start_gnss_by_gtw_gpstool(ad, state, type="gnss"): + +def start_gnss_by_gtw_gpstool(ad, state, type="gnss", bgdisplay=False): """Start or stop GNSS on GTW_GPSTool. Args: ad: An AndroidDevice object. state: True to start GNSS. False to Stop GNSS. type: Different API for location fix. Use gnss/flp/nmea + bgdisplay: true to run GTW when Display off. + false to not run GTW when Display off. """ - if state: + if state and not bgdisplay: ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool " "--es mode gps --es type %s" % type) + elif state and bgdisplay: + ad.adb.shell("am start -S -n com.android.gpstool/.GPSTool --es mode" + " gps --es type {} --ez BG {}".format(type, bgdisplay)) if not state: ad.log.info("Stop %s on GTW_GPSTool." % type) ad.adb.shell("am broadcast -a com.android.gpstool.stop_gps_action") time.sleep(3) + def process_gnss_by_gtw_gpstool(ad, criteria, type="gnss"): """Launch GTW GPSTool and Clear all GNSS aiding data Start GNSS tracking on GTW_GPSTool. @@ -1025,6 +1029,7 @@ def get_baseband_and_gms_version(ad, extra_msg=""): ad: An AndroidDevice object. """ try: + build_version = ad.adb.getprop("ro.build.id") baseband_version = ad.adb.getprop("gsm.version.baseband") gms_version = ad.adb.shell( "dumpsys package com.google.android.gms | grep versionName" @@ -1032,6 +1037,7 @@ def get_baseband_and_gms_version(ad, extra_msg=""): mpss_version = ad.adb.shell("cat /sys/devices/soc0/images | grep MPSS " "| cut -d ':' -f 3") if not extra_msg: + ad.log.info("TestResult Build_Version %s" % build_version) ad.log.info("TestResult Baseband_Version %s" % baseband_version) ad.log.info( "TestResult GMS_Version %s" % gms_version.replace(" ", "")) diff --git a/acts/framework/acts/test_utils/power/PowerBTBaseTest.py b/acts/framework/acts/test_utils/power/PowerBTBaseTest.py index 5dfda12c7a..e0fd37da2e 100644 --- a/acts/framework/acts/test_utils/power/PowerBTBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBTBaseTest.py @@ -14,18 +14,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -import time +import os +import acts.test_utils.bt.bt_power_test_utils as btputils +import acts.test_utils.bt.bt_test_utils as btutils import acts.test_utils.power.PowerBaseTest as PBT -from acts.test_utils.bt.bt_test_utils import enable_bluetooth -from acts.test_utils.bt.bt_test_utils import disable_bluetooth +from acts.test_utils.abstract_devices.bluetooth_handsfree_abstract_device import BluetoothHandsfreeAbstractDeviceFactory as bt_factory -BT_BASE_UUID = '00000000-0000-1000-8000-00805F9B34FB' -BT_CLASSICAL_DATA = [1, 2, 3] -BLE_LOCATION_SCAN_ENABLE = 'settings put global ble_scan_always_enabled 1' -BLE_LOCATION_SCAN_DISABLE = 'settings put global ble_scan_always_enabled 0' -START_PMC_CMD = 'am start -n com.android.pmc/com.android.pmc.PMCMainActivity' -PMC_VERBOSE_CMD = 'setprop log.tag.PMC VERBOSE' -PMC_BASE_SCAN = 'am broadcast -a com.android.pmc.BLESCAN --es ScanMode ' +BLE_LOCATION_SCAN_DISABLE = 'settings put secure location_mode 0' +PHONE_MUSIC_FILE_DIRECTORY = '/sdcard/Music' +INIT_ATTEN = [30] class PowerBTBaseTest(PBT.PowerBaseTest): @@ -34,16 +31,43 @@ class PowerBTBaseTest(PBT.PowerBaseTest): Inherited from the PowerBaseTest class """ + def setup_class(self): + + super().setup_class() + # Get music file and push it to the phone + music_files = self.user_params.get('music_files', []) + if music_files: + music_src = music_files[0] + music_dest = PHONE_MUSIC_FILE_DIRECTORY + success = self.dut.push_system_file(music_src, music_dest) + if success: + self.music_file = os.path.join(PHONE_MUSIC_FILE_DIRECTORY, + os.path.basename(music_src)) + # Initialize media_control class + self.media = btputils.MediaControl(self.dut, self.music_file) + # Set Attenuator to the initial attenuation + if hasattr(self, 'attenuators'): + self.set_attenuation(INIT_ATTEN) + # Create the BTOE(Bluetooth-Other-End) device object + bt_devices = self.user_params.get('bt_devices', []) + if bt_devices: + attr, idx = bt_devices.split(':') + self.bt_device_controller = getattr(self, attr)[int(idx)] + self.bt_device = bt_factory().generate(self.bt_device_controller) + else: + self.log.error('No BT devices config is provided!') + # Turn off screen as all tests will be screen off + self.dut.droid.goToSleepNow() + def setup_test(self): super().setup_test() + self.unpack_userparams(volume=0.9) # Reset BT to factory defaults self.dut.droid.bluetoothFactoryReset() - time.sleep(2) - # Start PMC app. - self.log.info('Start PMC app...') - self.dut.adb.shell(START_PMC_CMD) - self.dut.adb.shell(PMC_VERBOSE_CMD) + self.bt_device.reset() + self.bt_device.power_on() + btutils.enable_bluetooth(self.dut.droid, self.dut.ed) def teardown_test(self): """Tear down necessary objects after test case is finished. @@ -54,6 +78,11 @@ class PowerBTBaseTest(PBT.PowerBaseTest): super().teardown_test() self.dut.droid.bluetoothFactoryReset() self.dut.adb.shell(BLE_LOCATION_SCAN_DISABLE) + if hasattr(self, 'media'): + self.media.stop() + self.bt_device.reset() + self.bt_device.power_off() + btutils.disable_bluetooth(self.dut.droid) def teardown_class(self): """Clean up the test class after tests finish running @@ -61,75 +90,7 @@ class PowerBTBaseTest(PBT.PowerBaseTest): """ super().teardown_class() self.dut.droid.bluetoothFactoryReset() - - def phone_setup_for_BT(self, bt_on, ble_on, screen_status): - """Sets the phone and Bluetooth in the desired state - - Args: - bt_on: Enable/Disable BT - ble_on: Enable/Disable BLE - screen_status: screen ON or OFF - """ - - # Check if we are enabling a background scan - # TODO: Turn OFF cellular wihtout having to turn ON airplane mode - if bt_on == 'OFF' and ble_on == 'ON': - self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE) - self.dut.droid.connectivityToggleAirplaneMode(False) - time.sleep(2) - - # Turn ON/OFF BT - if bt_on == 'ON': - enable_bluetooth(self.dut.droid, self.dut.ed) - self.dut.log.info('BT is ON') - else: - disable_bluetooth(self.dut.droid) - self.dut.droid.bluetoothDisableBLE() - self.dut.log.info('BT is OFF') - time.sleep(2) - - # Turn ON/OFF BLE - if ble_on == 'ON': - self.dut.droid.bluetoothEnableBLE() - self.dut.log.info('BLE is ON') - else: - self.dut.droid.bluetoothDisableBLE() - self.dut.log.info('BLE is OFF') - time.sleep(2) - - # Set the desired screen status - if screen_status == 'OFF': - self.dut.droid.goToSleepNow() - self.dut.log.info('Screen is OFF') - time.sleep(2) - - def start_pmc_ble_scan(self, - scan_mode, - offset_start, - scan_time, - idle_time=None, - num_reps=1): - """Starts a generic BLE scan via the PMC app - - Args: - dut: object of the android device under test - scan mode: desired BLE scan type - offset_start: Time delay in seconds before scan starts - scan_time: active scan time - idle_time: iddle time (i.e., no scans occuring) - num_reps: Number of repetions of the ative+idle scan sequence - """ - scan_dur = scan_time - if not idle_time: - idle_time = 0.2 * scan_time - scan_dur = 0.8 * scan_time - - first_part_msg = '%s%s --es StartTime %d --es ScanTime %d' % ( - PMC_BASE_SCAN, scan_mode, offset_start, scan_dur) - - msg = '%s --es NoScanTime %d --es Repetitions %d' % (first_part_msg, - idle_time, - num_reps) - - self.dut.log.info('Sent BLE scan broadcast message: %s', msg) - self.dut.adb.shell(msg) + self.dut.adb.shell(BLE_LOCATION_SCAN_DISABLE) + self.bt_device.reset() + self.bt_device.power_off() + btutils.disable_bluetooth(self.dut.droid) diff --git a/acts/framework/acts/test_utils/power/PowerBaseTest.py b/acts/framework/acts/test_utils/power/PowerBaseTest.py index 71dbf7c2f3..2334702828 100644 --- a/acts/framework/acts/test_utils/power/PowerBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerBaseTest.py @@ -17,20 +17,23 @@ import json import logging import math import os +import re import time + import acts.controllers.iperf_server as ipf from acts import asserts from acts import base_test from acts import utils -from acts.controllers import monsoon +from acts.controllers.monsoon_lib.api.common import MonsoonError +from acts.controllers.monsoon_lib.api.common import PassthroughStates from acts.metrics.loggers.blackbox import BlackboxMetricLogger from acts.test_utils.power.loggers.power_metric_logger import PowerMetricLogger -from acts.test_utils.wifi import wifi_test_utils as wutils from acts.test_utils.wifi import wifi_power_test_utils as wputils +from acts.test_utils.wifi import wifi_test_utils as wutils RESET_BATTERY_STATS = 'dumpsys batterystats --reset' IPERF_TIMEOUT = 180 -THRESHOLD_TOLERANCE = 0.2 +THRESHOLD_TOLERANCE_DEFAULT = 0.2 GET_FROM_PHONE = 'get_from_dut' GET_FROM_AP = 'get_from_ap' PHONE_BATTERY_VOLTAGE_DEFAULT = 4.2 @@ -49,6 +52,7 @@ class ObjNew(): """Create a random obj with unknown attributes and value. """ + def __init__(self, **kwargs): self.__dict__.update(kwargs) @@ -67,6 +71,7 @@ class PowerBaseTest(base_test.BaseTestClass): """Base class for all wireless power related tests. """ + def __init__(self, controllers): base_test.BaseTestClass.__init__(self, controllers) @@ -102,11 +107,13 @@ class PowerBaseTest(base_test.BaseTestClass): bug_report=False, extra_wait=None, iperf_duration=None, + pass_fail_tolerance=THRESHOLD_TOLERANCE_DEFAULT, mon_voltage=PHONE_BATTERY_VOLTAGE_DEFAULT) # Setup the must have controllers, phone and monsoon self.dut = self.android_devices[0] self.mon_data_path = os.path.join(self.log_path, 'Monsoon') + os.makedirs(self.mon_data_path, exist_ok=True) self.mon = self.monsoons[0] self.mon.set_max_current(8.0) self.mon.set_voltage(self.mon_voltage) @@ -122,7 +129,7 @@ class PowerBaseTest(base_test.BaseTestClass): self.network_file = file elif 'rockbottom_' + self.dut.model in file: self.rockbottom_script = file - #Abort the class if threshold and rockbottom file is missing + # Abort the class if threshold and rockbottom file is missing asserts.abort_class_if( not self.threshold_file, 'Required test pass/fail threshold file is missing') @@ -137,7 +144,7 @@ class PowerBaseTest(base_test.BaseTestClass): self.mon_info = self.create_monsoon_info() # Sync device time, timezone and country code - utils.require_sl4a((self.dut, )) + utils.require_sl4a((self.dut,)) utils.sync_device_time(self.dut) self.dut.droid.wifiSetCountryCode('US') @@ -212,6 +219,8 @@ class PowerBaseTest(base_test.BaseTestClass): Args: file: the common file containing pass fail threshold. + test_specific: if True, returns the JSON element within the file + that starts with the test class name. """ with open(file, 'r') as f: params = json.load(f) @@ -259,20 +268,23 @@ class PowerBaseTest(base_test.BaseTestClass): """The actual test flow and result processing and validate. """ - self.collect_power_data() - self.pass_fail_check() + result = self.collect_power_data() + self.pass_fail_check(result.average_current) def collect_power_data(self): """Measure power, plot and take log if needed. + Returns: + A MonsoonResult object. """ - tag = '' # Collecting current measurement data and plot - self.file_path, self.test_result = self.monsoon_data_collect_save() - self.power_result.metric_value = self.test_result * self.mon_voltage - wputils.monsoon_data_plot(self.mon_info, self.file_path, tag=tag) + result = self.monsoon_data_collect_save() + self.power_result.metric_value = (result.average_current * + self.mon_voltage) + wputils.monsoon_data_plot(self.mon_info, result) + return result - def pass_fail_check(self): + def pass_fail_check(self, average_current=None): """Check the test result and decide if it passed or failed. The threshold is provided in the config file. In this class, result is @@ -285,18 +297,18 @@ class PowerBaseTest(base_test.BaseTestClass): return current_threshold = self.threshold[self.test_name] - if self.test_result: + if average_current: asserts.assert_true( - abs(self.test_result - current_threshold) / current_threshold < - THRESHOLD_TOLERANCE, + abs(average_current - current_threshold) / current_threshold < + self.pass_fail_tolerance, 'Measured average current in [{}]: {:.2f}mA, which is ' 'out of the acceptable range {:.2f}±{:.2f}mA'.format( - self.test_name, self.test_result, current_threshold, + self.test_name, average_current, current_threshold, self.pass_fail_tolerance * current_threshold)) asserts.explicit_pass( 'Measurement finished for [{}]: {:.2f}mA, which is ' 'within the acceptable range {:.2f}±{:.2f}'.format( - self.test_name, self.test_result, current_threshold, + self.test_name, average_current, current_threshold, self.pass_fail_tolerance * current_threshold)) else: asserts.fail( @@ -333,8 +345,12 @@ class PowerBaseTest(base_test.BaseTestClass): logging.info('Monsoon recovered from unexpected error') time.sleep(2) return True - except monsoon.MonsoonError: - logging.info(self.mon.mon.ser.in_waiting) + except MonsoonError: + try: + self.log.info(self.mon_info.dut._mon.ser.in_waiting) + except AttributeError: + # This attribute does not exist for HVPMs. + pass logging.warning('Unable to recover monsoon from unexpected error') return False @@ -345,94 +361,70 @@ class PowerBaseTest(base_test.BaseTestClass): log file. Take bug report if requested. Returns: - data_path: the absolute path to the log file of monsoon current - measurement - avg_current: the average current of the test + A MonsoonResult object containing information about the gathered + data. """ tag = '{}_{}_{}'.format(self.test_name, self.dut.model, self.dut.build_info['build_id']) + data_path = os.path.join(self.mon_info.data_path, '{}.txt'.format(tag)) - total_expected_samples = self.mon_info.freq * (self.mon_info.duration + - self.mon_info.offset) - min_required_samples = total_expected_samples * MIN_PERCENT_SAMPLE / 100 - # Retry counter for monsoon data aquisition - retry_measure = 1 - # Indicator that need to re-collect data - need_collect_data = 1 - result = None - while retry_measure <= MEASUREMENT_RETRY_COUNT: - try: - # If need to retake data - if need_collect_data == 1: - #Resets the battery status right before the test started - self.dut.adb.shell(RESET_BATTERY_STATS) - self.log.info( - 'Starting power measurement with monsoon box, try #{}'. - format(retry_measure)) - #Start the power measurement using monsoon - self.mon_info.dut.monsoon_usb_auto() - result = self.mon_info.dut.measure_power( - self.mon_info.freq, - self.mon_info.duration, - tag=tag, - offset=self.mon_info.offset) - self.mon_info.dut.reconnect_dut() - # Reconnect to dut - else: - self.mon_info.dut.reconnect_dut() - # Reconnect and return measurement results if no error happens - avg_current = result.average_current - monsoon.MonsoonData.save_to_text_file([result], data_path) - self.log.info('Power measurement done within {} try'.format( + + # If the specified Monsoon data file already exists (e.g., multiple + # measurements in a single test), write the results to a new file with + # the postfix "_#". + if os.path.exists(data_path): + highest_value = 1 + for filename in os.listdir(os.path.dirname(data_path)): + match = re.match(r'{}_(\d+).txt'.format(tag), filename) + if match: + highest_value = int(match.group(1)) + + data_path = os.path.join(self.mon_info.data_path, + '%s_%s.txt' % (tag, highest_value + 1)) + + total_expected_samples = self.mon_info.freq * self.mon_info.duration + min_required_samples = (total_expected_samples + * MIN_PERCENT_SAMPLE / 100) + for retry_measure in range(1, MEASUREMENT_RETRY_COUNT + 1): + # Resets the battery status right before the test starts. + self.dut.adb.shell(RESET_BATTERY_STATS) + self.log.info( + 'Starting power measurement, attempt #{}.'.format( retry_measure)) - return data_path, avg_current - # Catch monsoon errors during measurement - except monsoon.MonsoonError: - self.log.info(self.mon_info.dut.mon.ser.in_waiting) - # Break early if it's one count away from limit - if retry_measure == MEASUREMENT_RETRY_COUNT: - self.log.error( - 'Test failed after maximum measurement retry') - break - - self.log.warning('Monsoon error happened, now try to recover') - # Retry loop to recover monsoon from error - retry_monsoon = 1 - while retry_monsoon <= RECOVER_MONSOON_RETRY_COUNT: - mon_status = self.monsoon_recover() - if mon_status: - break - else: - retry_monsoon += 1 - self.log.warning( - 'Wait for {} second then try again'.format( - MONSOON_RETRY_INTERVAL)) - time.sleep(MONSOON_RETRY_INTERVAL) - - # Break the loop to end test if failed to recover monsoon - if not mon_status: - self.log.error( - 'Tried our best, still failed to recover monsoon') - break - else: - # If there is no data, or captured samples are less than min - # required, re-take - if not result: - self.log.warning('No data taken, need to remeasure') - elif len(result._data_points) <= min_required_samples: - self.log.warning( - 'More than {} percent of samples are missing due to monsoon error. Need to remeasure' - .format(100 - MIN_PERCENT_SAMPLE)) - else: - need_collect_data = 0 - self.log.warning( - 'Data collected is valid, try reconnect to DUT to finish test' - ) - retry_measure += 1 - - if retry_measure > MEASUREMENT_RETRY_COUNT: - self.log.error('Test failed after maximum measurement retry') + # Start the power measurement using monsoon. + self.mon_info.dut.usb(PassthroughStates.AUTO) + result = self.mon_info.dut.measure_power( + self.mon_info.duration, + measure_after_seconds=self.mon_info.offset, + hz=self.mon_info.freq, + output_path=data_path) + self.mon_info.dut.usb(PassthroughStates.ON) + + self.log.debug(result) + self.log.debug('Samples Gathered: %s. Max Samples: %s ' + 'Min Samples Required: %s.' % + (result.num_samples, total_expected_samples, + min_required_samples)) + + if result.num_samples <= min_required_samples: + retry_measure += 1 + self.log.warning( + 'More than {} percent of samples are missing due to ' + 'dropped packets. Need to remeasure.'.format( + 100 - MIN_PERCENT_SAMPLE)) + continue + + self.log.info('Measurement successful after {} attempt(s).'.format( + retry_measure)) + return result + else: + try: + self.log.info(self.mon_info.dut._mon.ser.in_waiting) + except AttributeError: + # This attribute does not exist for HVPMs. + pass + self.log.error('Unable to gather enough samples to run validation.') def process_iperf_results(self): """Get the iperf results and process. @@ -457,7 +449,7 @@ class PowerBaseTest(base_test.BaseTestClass): throughput = (math.fsum( iperf_result.instantaneous_rates[self.start_meas_time:-1] ) / len(iperf_result.instantaneous_rates[self.start_meas_time:-1]) - ) * 8 * (1.024**2) + ) * 8 * (1.024 ** 2) self.log.info('The average throughput is {}'.format(throughput)) except ValueError: diff --git a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py index 14c95975c9..5f38aec086 100644 --- a/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerGnssBaseTest.py @@ -17,6 +17,8 @@ import logging import time +import os + import acts.test_utils.power.PowerBaseTest as PBT from acts import base_test @@ -38,40 +40,50 @@ class PowerGnssBaseTest(PBT.PowerBaseTest): """ def collect_power_data(self): - """Measure power and plot. - - """ - tag = '_Power' - super().collect_power_data() - self.monsoon_data_plot_power(self.mon_info, self.file_path, tag=tag) + """Measure power and plot.""" + result = super().collect_power_data() + self.monsoon_data_plot_power(self.mon_info, result, tag='_Power') + return result - def monsoon_data_plot_power(self, mon_info, file_path, tag=""): + def monsoon_data_plot_power(self, mon_info, monsoon_results, tag=''): """Plot the monsoon power data using bokeh interactive plotting tool. Args: - mon_info: Dictionary with the monsoon packet config - file_path: the path to the monsoon log file with current data + mon_info: Dictionary with the monsoon packet config. + monsoon_results: a MonsoonResult or list of MonsoonResult objects to + to plot. + tag: an extra tag to append to the resulting filename. """ - self.log.info("Plot the power measurement data") - # Get results as monsoon data object from the input file - results = monsoon.MonsoonData.from_text_file(file_path) - # Decouple current and timestamp data from the monsoon object - current_data = [] - timestamps = [] - voltage = results[0].voltage - for result in results: - current_data.extend(result.data_points) - timestamps.extend(result.timestamps) - period = 1 / mon_info.freq - time_relative = [x * period for x in range(len(current_data))] - # Calculate the average current for the test - - current_data = [x * 1000 for x in current_data] - power_data = [x * voltage for x in current_data] - avg_current = sum(current_data) / len(current_data) - color = ['navy'] * len(current_data) + if not isinstance(monsoon_results, list): + monsoon_results = [monsoon_results] + logging.info('Plotting the power measurement data.') + + voltage = monsoon_results[0].voltage + + total_current = 0 + total_samples = 0 + for result in monsoon_results: + total_current += result.average_current * result.num_samples + total_samples += result.num_samples + avg_current = total_current / total_samples + + time_relative = [ + data_point.time + for monsoon_result in monsoon_results + for data_point in monsoon_result.get_data_points() + ] + + power_data = [ + data_point.current * voltage + for monsoon_result in monsoon_results + for data_point in monsoon_result.get_data_points() + ] + + total_data_points = sum( + result.num_samples for result in monsoon_results) + color = ['navy'] * total_data_points # Preparing the data and source link for bokehn java callback source = ColumnDataSource( @@ -94,18 +106,20 @@ class PowerGnssBaseTest(PBT.PowerBaseTest): dt = DataTable( source=s2, columns=columns, width=1300, height=60, editable=True) - plot_title = file_path[file_path.rfind('/') + 1:-4] + tag - output_file("%s/%s.html" % (mon_info.data_path, plot_title)) - TOOLS = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save' + plot_title = (os.path.basename( + os.path.splitext(monsoon_results[0].tag)[0]) + + tag) + output_file(os.path.join(mon_info.data_path, plot_title + '.html')) + tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save' # Create a new plot with the datatable above plot = figure( plot_width=1300, plot_height=700, title=plot_title, - tools=TOOLS, - output_backend="webgl") - plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width")) - plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height")) + tools=tools, + output_backend='webgl') + plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='width')) + plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='height')) plot.line('x0', 'y0', source=source, line_width=2) plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color') plot.xaxis.axis_label = 'Time (s)' @@ -119,8 +133,7 @@ class PowerGnssBaseTest(PBT.PowerBaseTest): code=customjsscript) # Layout the plot and the datatable bar - l = layout([[dt], [plot]]) - save(l) + save(layout([[dt], [plot]])) def disconnect_usb(self, ad, sleeptime): """Disconnect usb while device is on sleep and diff --git a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py index 6af9a491b3..937656e352 100644 --- a/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py +++ b/acts/framework/acts/test_utils/power/PowerWiFiBaseTest.py @@ -46,7 +46,7 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest): self.pkt_sender = self.packet_senders[0] if hasattr(self, 'iperf_servers'): self.iperf_server = self.iperf_servers[0] - if hasattr(self, 'iperf_duration'): + if self.iperf_duration: self.mon_duration = self.iperf_duration - 10 self.create_monsoon_info() @@ -115,10 +115,11 @@ class PowerWiFiBaseTest(PBT.PowerBaseTest): If IPERF is run, need to pull iperf results and attach it to the plot. """ - super().collect_power_data() + result = super().collect_power_data() tag = '' - if hasattr(self, IPERF_DURATION): + if self.iperf_duration: throughput = self.process_iperf_results() tag = '_RSSI_{0:d}dBm_Throughput_{1:.2f}Mbps'.format( self.RSSI, throughput) - wputils.monsoon_data_plot(self.mon_info, self.file_path, tag=tag) + wputils.monsoon_data_plot(self.mon_info, result, tag=tag) + return result diff --git a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py index baa7d33179..8637b2dcee 100644 --- a/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py +++ b/acts/framework/acts/test_utils/power/tel_simulations/LteSimulation.py @@ -645,12 +645,10 @@ class LteSimulation(BaseSimulation): self.log.info( "The test name does not include the '{}' parameter. Disabled " "by default.".format(self.PARAM_RRC_STATUS_CHANGE_TIMER)) - self.simulator.anritsu.set_lte_rrc_status_change(False) + self.simulator.set_lte_rrc_state_change_timer(False) else: - self.rrc_sc_timer = int(values[1]) - self.simulator.anritsu.set_lte_rrc_status_change(True) - self.simulator.anritsu.set_lte_rrc_status_change_timer( - self.rrc_sc_timer) + timer = int(values[1]) + self.simulator.anritsu.set_lte_rrc_status_change(True, timer) # Get uplink power diff --git a/acts/framework/acts/test_utils/tel/tel_test_utils.py b/acts/framework/acts/test_utils/tel/tel_test_utils.py index fb4780095a..323e1bc1e0 100644 --- a/acts/framework/acts/test_utils/tel/tel_test_utils.py +++ b/acts/framework/acts/test_utils/tel/tel_test_utils.py @@ -24,6 +24,7 @@ import re import os import urllib.parse import time +import acts.controllers.iperf_server as ipf from acts import signals from acts import utils @@ -801,6 +802,15 @@ def get_service_state_by_adb(log, ad): ad.log.info("mVoiceRegState is %s %s", result.group(1), result.group(2)) return result.group(2) + else: + if getattr(ad, "sdm_log", False): + #look for all occurrence in string + result2 = re.findall(r"mVoiceRegState=(\S+)\((\S+)\)", output) + for voice_state in result2: + if voice_state[0] == 0: + ad.log.info("mVoiceRegState is 0 %s", voice_state[1]) + return voice_state[1] + return result2[1][1] else: result = re.search(r"mServiceState=(\S+)", output) if result: @@ -2805,38 +2815,33 @@ def verify_internet_connection(log, ad, retries=3, expected_state=True): return False -def iperf_test_by_adb(log, - ad, - iperf_server, - port_num=None, - reverse=False, - timeout=180, - limit_rate=None, - omit=10, - ipv6=False, - rate_dict=None, - blocking=True, - log_file_path=None): - """Iperf test by adb. +def iperf_test_with_options(log, + ad, + iperf_server, + iperf_option, + timeout=180, + rate_dict=None, + blocking=True, + log_file_path=None): + """Iperf adb run helper. Args: log: log object ad: Android Device Object. - iperf_Server: The iperf host url". - port_num: TCP/UDP server port + iperf_server: The iperf host url". + iperf_option: The options to pass to iperf client timeout: timeout for file download to complete. - limit_rate: iperf bandwidth option. None by default - omit: the omit option provided in iperf command. + rate_dict: dictionary that can be passed in to save data + blocking: run iperf in blocking mode if True + log_file_path: location to save logs + Returns: + True if IPerf runs without throwing an exception """ - iperf_option = "-t %s -O %s -J" % (timeout, omit) - if limit_rate: iperf_option += " -b %s" % limit_rate - if port_num: iperf_option += " -p %s" % port_num - if ipv6: iperf_option += " -6" - if reverse: iperf_option += " -R" try: if log_file_path: ad.adb.shell("rm %s" % log_file_path, ignore_status=True) ad.log.info("Running adb iperf test with server %s", iperf_server) + ad.log.info("IPerf options are %s", iperf_option) if not blocking: ad.run_iperf_client_nb( iperf_server, @@ -2846,21 +2851,133 @@ def iperf_test_by_adb(log, return True result, data = ad.run_iperf_client( iperf_server, iperf_option, timeout=timeout + 60) - ad.log.info("Iperf test result with server %s is %s", iperf_server, + ad.log.info("IPerf test result with server %s is %s", iperf_server, result) if result: - data_json = json.loads(''.join(data)) - tx_rate = data_json['end']['sum_sent']['bits_per_second'] - rx_rate = data_json['end']['sum_received']['bits_per_second'] - ad.log.info( - 'iPerf3 upload speed is %sbps, download speed is %sbps', - tx_rate, rx_rate) + iperf_str = ''.join(data) + iperf_result = ipf.IPerfResult(iperf_str) + if "-u" in iperf_option: + udp_rate = iperf_result.avg_rate + if udp_rate is None: + ad.log.warning( + "UDP rate is none, IPerf server returned error: %s", + iperf_result.error) + ad.log.info("IPerf3 udp speed is %sbps", udp_rate) + else: + tx_rate = iperf_result.avg_send_rate + rx_rate = iperf_result.avg_receive_rate + if (tx_rate or rx_rate) is None: + ad.log.warning( + "A TCP rate is none, IPerf server returned error: %s", + iperf_result.error) + ad.log.info( + "IPerf3 upload speed is %sbps, download speed is %sbps", + tx_rate, rx_rate) if rate_dict is not None: rate_dict["Uplink"] = tx_rate rate_dict["Downlink"] = rx_rate return result - except Exception as e: + except AdbError as e: ad.log.warning("Fail to run iperf test with exception %s", e) + raise + + +def iperf_udp_test_by_adb(log, + ad, + iperf_server, + port_num=None, + reverse=False, + timeout=180, + limit_rate=None, + omit=10, + ipv6=False, + rate_dict=None, + blocking=True, + log_file_path=None): + """Iperf test by adb using UDP. + + Args: + log: log object + ad: Android Device Object. + iperf_Server: The iperf host url". + port_num: TCP/UDP server port + reverse: whether to test download instead of upload + timeout: timeout for file download to complete. + limit_rate: iperf bandwidth option. None by default + omit: the omit option provided in iperf command. + ipv6: whether to run the test as ipv6 + rate_dict: dictionary that can be passed in to save data + blocking: run iperf in blocking mode if True + log_file_path: location to save logs + """ + iperf_option = "-u -i 1 -t %s -O %s -J" % (timeout, omit) + if limit_rate: + iperf_option += " -b %s" % limit_rate + if port_num: + iperf_option += " -p %s" % port_num + if ipv6: + iperf_option += " -6" + if reverse: + iperf_option += " -R" + try: + return iperf_test_with_options(log, + ad, + iperf_server, + iperf_option, + timeout, + rate_dict, + blocking, + log_file_path) + except AdbError: + return False + +def iperf_test_by_adb(log, + ad, + iperf_server, + port_num=None, + reverse=False, + timeout=180, + limit_rate=None, + omit=10, + ipv6=False, + rate_dict=None, + blocking=True, + log_file_path=None): + """Iperf test by adb using TCP. + + Args: + log: log object + ad: Android Device Object. + iperf_server: The iperf host url". + port_num: TCP/UDP server port + reverse: whether to test download instead of upload + timeout: timeout for file download to complete. + limit_rate: iperf bandwidth option. None by default + omit: the omit option provided in iperf command. + ipv6: whether to run the test as ipv6 + rate_dict: dictionary that can be passed in to save data + blocking: run iperf in blocking mode if True + log_file_path: location to save logs + """ + iperf_option = "-t %s -O %s -J" % (timeout, omit) + if limit_rate: + iperf_option += " -b %s" % limit_rate + if port_num: + iperf_option += " -p %s" % port_num + if ipv6: + iperf_option += " -6" + if reverse: + iperf_option += " -R" + try: + return iperf_test_with_options(log, + ad, + iperf_server, + iperf_option, + timeout, + rate_dict, + blocking, + log_file_path) + except AdbError: return False diff --git a/acts/framework/acts/test_utils/wifi/ota_chamber.py b/acts/framework/acts/test_utils/wifi/ota_chamber.py index 26abbe5f8b..53494dad52 100644 --- a/acts/framework/acts/test_utils/wifi/ota_chamber.py +++ b/acts/framework/acts/test_utils/wifi/ota_chamber.py @@ -14,6 +14,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import contextlib +import io import time from acts import logger from acts import utils @@ -48,7 +50,6 @@ class OtaChamber(object): Base class provides functions whose implementation is shared by all chambers. """ - def reset_chamber(self): """Resets the chamber to its zero/home state.""" raise NotImplementedError @@ -80,7 +81,6 @@ class OtaChamber(object): class MockChamber(OtaChamber): """Class that implements mock chamber for test development and debug.""" - def __init__(self, config): self.config = config.copy() self.device_id = self.config['device_id'] @@ -117,7 +117,6 @@ class MockChamber(OtaChamber): class OctoboxChamber(OtaChamber): """Class that implements Octobox chamber.""" - def __init__(self, config): self.config = config.copy() self.device_id = self.config['device_id'] @@ -130,8 +129,9 @@ class OctoboxChamber(OtaChamber): def set_orientation(self, orientation): self.log.info('Setting orientation to {} degrees.'.format(orientation)) - utils.exe_cmd('sudo {} -d {} -p {}'.format( - self.TURNTABLE_FILE_PATH, self.device_id, orientation)) + utils.exe_cmd('sudo {} -d {} -p {}'.format(self.TURNTABLE_FILE_PATH, + self.device_id, + orientation)) def reset_chamber(self): self.log.info('Resetting chamber to home state') @@ -155,7 +155,6 @@ class ChamberAutoConnect(object): class BluetestChamber(OtaChamber): """Class that implements Octobox chamber.""" - def __init__(self, config): import flow self.config = config.copy() @@ -165,6 +164,16 @@ class BluetestChamber(OtaChamber): self.stirrer_ids = [0, 1, 2] self.current_mode = None + # Capture print output decorator + @staticmethod + def _capture_output(func, *args, **kwargs): + """Creates a decorator to capture stdout from bluetest module""" + f = io.StringIO() + with contextlib.redirect_stdout(f): + func(*args, **kwargs) + output = f.getvalue() + return output + def _connect(self): self.chamber.connect(self.config['ip_address'], self.config['username'], self.config['password']) @@ -172,12 +181,15 @@ class BluetestChamber(OtaChamber): def _init_manual_mode(self): self.current_mode = 'manual' for stirrer_id in self.stirrer_ids: - self.chamber.chamber_stirring_manual_init(stirrer_id) + out = self._capture_output( + self.chamber.chamber_stirring_manual_init, stirrer_id) + if "failed" in out: + self.log.warning("Initialization error: {}".format(out)) time.sleep(CHAMBER_SLEEP) def _init_continuous_mode(self): self.current_mode = 'continuous' - self.chamber.chamber_stirring_continous_init() + self.chamber.chamber_stirring_continuous_init() def _init_stepped_mode(self, steps): self.current_mode = 'stepped' @@ -188,7 +200,17 @@ class BluetestChamber(OtaChamber): if self.current_mode != 'manual': self._init_manual_mode() self.log.info('Setting stirrer {} to {}.'.format(stirrer_id, position)) - self.chamber.chamber_stirring_manual_set_pos(stirrer_id, position) + out = self._capture_output( + self.chamber.chamber_stirring_manual_set_pos, stirrer_id, position) + if "failed" in out: + self.log.warning("Bluetest error: {}".format(out)) + self.log.warning("Set position failed. Retrying.") + self.current_mode = None + self.set_stirrer_pos(stirrer_id, position) + else: + self._capture_output(self.chamber.chamber_stirring_manual_wait, + CHAMBER_SLEEP) + self.log.warning('Stirrer {} at {}.'.format(stirrer_id, position)) def set_orientation(self, orientation): self.set_stirrer_pos(2, orientation * 100 / 360) @@ -196,10 +218,10 @@ class BluetestChamber(OtaChamber): def start_continuous_stirrers(self): if self.current_mode != 'continuous': self._init_continuous_mode() - self.chamber.chamber_stirring_continous_start() + self.chamber.chamber_stirring_continuous_start() def stop_continuous_stirrers(self): - self.chamber.chamber_stirring_continous_stop() + self.chamber.chamber_stirring_continuous_stop() def step_stirrers(self, steps): if self.current_mode != 'stepped': diff --git a/acts/framework/acts/test_utils/wifi/ota_sniffer.py b/acts/framework/acts/test_utils/wifi/ota_sniffer.py index 60168bb458..884f8f12ca 100644 --- a/acts/framework/acts/test_utils/wifi/ota_sniffer.py +++ b/acts/framework/acts/test_utils/wifi/ota_sniffer.py @@ -114,7 +114,6 @@ class OtaSnifferBase(object): class MockSniffer(OtaSnifferBase): """Class that implements mock sniffer for test development and debug.""" - def __init__(self, config): self.log = logger.create_tagged_trace_logger("Mock Sniffer") @@ -354,12 +353,12 @@ class TsharkSnifferBase(OtaSnifferBase): log_file = self._get_full_file_path(tag) with open(temp_dump_file, "r") as input_csv, open(log_file, "w") as output_csv: - reader = csv.DictReader( - input_csv, fieldnames=self.TSHARK_COLUMNS, delimiter="^") - writer = csv.DictWriter( - output_csv, - fieldnames=self.TSHARK_OUTPUT_COLUMNS, - delimiter="\t") + reader = csv.DictReader(input_csv, + fieldnames=self.TSHARK_COLUMNS, + delimiter="^") + writer = csv.DictWriter(output_csv, + fieldnames=self.TSHARK_OUTPUT_COLUMNS, + delimiter="\t") writer.writeheader() for row in reader: if row["subtype"] in self.TYPE_SUBTYPE_DICT.keys(): @@ -427,7 +426,6 @@ class TsharkSnifferBase(OtaSnifferBase): class TsharkSnifferOnUnix(TsharkSnifferBase): """Class that implements Tshark based sniffer controller on Unix systems.""" - def _scan_for_networks(self): """Scans the wireless networks on the sniffer. @@ -457,7 +455,6 @@ class TsharkSnifferOnUnix(TsharkSnifferBase): class TsharkSnifferOnLinux(TsharkSnifferBase): """Class that implements Tshark based sniffer controller on Linux systems.""" - def _scan_for_networks(self): """Scans the wireless networks on the sniffer. @@ -480,8 +477,8 @@ class TsharkSnifferOnLinux(TsharkSnifferBase): password: password of the wireless network to connect to. """ if password != "": - connect_command = "sudo nmcli device wifi connect {} password {} ".format( - ssid, password), + connect_command = "sudo nmcli device wifi connect {} password {}".format( + ssid, password) else: connect_command = "sudo nmcli device wifi connect {}".format(ssid) self._sniffer_server.run(connect_command) diff --git a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py index 34a820b934..ac18a8f3e8 100644 --- a/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py +++ b/acts/framework/acts/test_utils/wifi/wifi_power_test_utils.py @@ -16,8 +16,8 @@ import logging import time +import os from acts import utils -from acts.controllers import monsoon from acts.libs.proc import job from acts.controllers.ap_lib import bridge_interface as bi from acts.test_utils.wifi import wifi_test_utils as wutils @@ -39,7 +39,7 @@ ENABLED_MODULATED_DTIM = 'gEnableModulatedDTIM=' MAX_MODULATED_DTIM = 'gMaxLIModulatedDTIM=' -def monsoon_data_plot(mon_info, file_path, tag=""): +def monsoon_data_plot(mon_info, monsoon_results, tag=''): """Plot the monsoon current data using bokeh interactive plotting tool. Plotting power measurement data with bokeh to generate interactive plots. @@ -50,9 +50,10 @@ def monsoon_data_plot(mon_info, file_path, tag=""): Args: mon_info: obj with information of monsoon measurement, including - monsoon device object, measurement frequency, duration and - offset etc. - file_path: the path to the monsoon log file with current data + monsoon device object, measurement frequency, duration, etc. + monsoon_results: a MonsoonResult or list of MonsoonResult objects to + to plot. + tag: an extra tag to append to the resulting filename. Returns: plot: the plotting object of bokeh, optional, will be needed if multiple @@ -60,25 +61,35 @@ def monsoon_data_plot(mon_info, file_path, tag=""): dt: the datatable object of bokeh, optional, will be needed if multiple datatables will be combined to one html file. """ + if not isinstance(monsoon_results, list): + monsoon_results = [monsoon_results] + logging.info('Plotting the power measurement data.') + + voltage = monsoon_results[0].voltage + + total_current = 0 + total_samples = 0 + for result in monsoon_results: + total_current += result.average_current * result.num_samples + total_samples += result.num_samples + avg_current = total_current / total_samples + + time_relative = [ + data_point.time + for monsoon_result in monsoon_results + for data_point in monsoon_result.get_data_points() + ] - log = logging.getLogger() - log.info("Plot the power measurement data") - #Get results as monsoon data object from the input file - results = monsoon.MonsoonData.from_text_file(file_path) - #Decouple current and timestamp data from the monsoon object - current_data = [] - timestamps = [] - voltage = results[0].voltage - [current_data.extend(x.data_points) for x in results] - [timestamps.extend(x.timestamps) for x in results] - period = 1 / float(mon_info.freq) - time_relative = [x * period for x in range(len(current_data))] - #Calculate the average current for the test - current_data = [x * 1000 for x in current_data] - avg_current = sum(current_data) / len(current_data) - color = ['navy'] * len(current_data) - - #Preparing the data and source link for bokehn java callback + current_data = [ + data_point.current * 1000 + for monsoon_result in monsoon_results + for data_point in monsoon_result.get_data_points() + ] + + total_data_points = sum(result.num_samples for result in monsoon_results) + color = ['navy'] * total_data_points + + # Preparing the data and source link for bokehn java callback source = ColumnDataSource( data=dict(x0=time_relative, y0=current_data, color=color)) s2 = ColumnDataSource( @@ -88,7 +99,7 @@ def monsoon_data_plot(mon_info, file_path, tag=""): x0=[round(avg_current * voltage, 2)], z1=[round(avg_current * voltage * mon_info.duration, 2)], z2=[round(avg_current * mon_info.duration, 2)])) - #Setting up data table for the output + # Setting up data table for the output columns = [ TableColumn(field='z0', title='Total Duration (s)'), TableColumn(field='y0', title='Average Current (mA)'), @@ -99,26 +110,29 @@ def monsoon_data_plot(mon_info, file_path, tag=""): dt = DataTable( source=s2, columns=columns, width=1300, height=60, editable=True) - plot_title = file_path[file_path.rfind('/') + 1:-4] + tag - output_file("%s/%s.html" % (mon_info.data_path, plot_title)) - TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save') + plot_title = (os.path.basename(os.path.splitext(monsoon_results[0].tag)[0]) + + tag) + output_file(os.path.join(mon_info.data_path, plot_title + '.html')) + tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save' # Create a new plot with the datatable above plot = figure( - plot_width=1300, plot_height=700, title=plot_title, tools=TOOLS) - plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width")) - plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height")) + plot_width=1300, + plot_height=700, + title=plot_title, + tools=tools, + output_backend='webgl') + plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='width')) + plot.add_tools(bokeh_tools.WheelZoomTool(dimensions='height')) plot.line('x0', 'y0', source=source, line_width=2) plot.circle('x0', 'y0', source=source, size=0.5, fill_color='color') plot.xaxis.axis_label = 'Time (s)' plot.yaxis.axis_label = 'Current (mA)' plot.title.text_font_size = {'value': '15pt'} - #Callback Java scripting + # Callback JavaScript source.selected.js_on_change( "indices", - CustomJS( - args=dict(source=source, mytable=dt), - code=""" + CustomJS(args=dict(source=source, mytable=dt), code=""" var inds = cb_obj.indices; var d1 = source.data; var d2 = mytable.source.data; @@ -154,10 +168,9 @@ def monsoon_data_plot(mon_info, file_path, tag=""): mytable.change.emit(); """)) - #Layout the plot and the datatable bar - l = layout([[dt], [plot]]) - save(l) - return [plot, dt] + # Layout the plot and the datatable bar + save(layout([[dt], [plot]])) + return plot, dt def change_dtim(ad, gEnableModulatedDTIM, gMaxLIModulatedDTIM=10): @@ -287,12 +300,12 @@ def bokeh_plot(data_sets, Returns: plot: bokeh plot figure object """ - TOOLS = ('box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save') + tools = 'box_zoom,box_select,pan,crosshair,redo,undo,reset,hover,save' plot = figure( plot_width=1300, plot_height=700, title=fig_property['title'], - tools=TOOLS, + tools=tools, output_backend="webgl") plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="width")) plot.add_tools(bokeh_tools.WheelZoomTool(dimensions="height")) @@ -324,7 +337,7 @@ def bokeh_plot(data_sets, legend=str(legend), fill_color=color) - #Plot properties + # Plot properties plot.xaxis.axis_label = fig_property['x_label'] plot.yaxis.axis_label = fig_property['y_label'] plot.legend.location = "top_right" diff --git a/acts/framework/setup.py b/acts/framework/setup.py index b5e4467bb6..32b8a93cad 100755 --- a/acts/framework/setup.py +++ b/acts/framework/setup.py @@ -39,6 +39,7 @@ install_requires = [ 'xlsxwriter', 'mobly', 'grpcio', + 'Monsoon', # paramiko-ng is needed vs paramiko as currently paramiko does not support # ed25519 ssh keys, which is what Fuchsia uses. 'paramiko-ng', diff --git a/acts/framework/tests/acts_import_unit_test.py b/acts/framework/tests/acts_import_unit_test.py index 3dd4857fd0..38dc3bf103 100755 --- a/acts/framework/tests/acts_import_unit_test.py +++ b/acts/framework/tests/acts_import_unit_test.py @@ -48,6 +48,12 @@ else: PY_FILE_REGEX = re.compile('.+\.py$') BLACKLIST = [ + # TODO(markdr): Remove these after BT team evaluates these tests. + 'acts/test_utils/bt/PowerBaseTest.py', + 'tests/google/ble/power/GattPowerTest.py', + 'tests/google/bt/power/A2dpPowerTest.py', + 'tests/google/ble/power/BleScanPowerTest.py', + 'acts/controllers/rohdeschwarz_lib/contest.py', 'acts/controllers/native.py', 'acts/controllers/native_android_device.py', diff --git a/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py b/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py index ec274b956e..9d628959d9 100755 --- a/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py +++ b/acts/framework/tests/controllers/monsoon_lib/api/monsoon_test.py @@ -209,15 +209,6 @@ class BaseMonsoonTest(unittest.TestCase): 'usbPassthroughMode should not be called when the ' 'state does not change.') - def test_monsoon_usb_auto_sets_usb_state_to_auto(self): - monsoon = MonsoonImpl() - - monsoon.monsoon_usb_auto() - - self.assertEqual(monsoon.status.usbPassthroughMode, - PassthroughStates.AUTO, - 'monsoon_usb_auto() did not disconnect USB.') - def take_samples_always_reestablishes_the_monsoon_connection(self): monsoon = MonsoonImpl() assembly_line = mock.Mock() diff --git a/acts/tests/google/power/PowerBaselineTest.py b/acts/tests/google/power/PowerBaselineTest.py index b75514423d..d8ef277b2e 100644 --- a/acts/tests/google/power/PowerBaselineTest.py +++ b/acts/tests/google/power/PowerBaselineTest.py @@ -31,7 +31,7 @@ class PowerBaselineTest(PowerBaseTest): self.dut.droid.goToSleepNow() # Measure power - self.collect_power_data() + result = self.collect_power_data() # Check if power measurement is below the required value - self.pass_fail_check() + self.pass_fail_check(result.average_current) diff --git a/acts/tests/google/power/bt/PowerBLEadvertiseTest.py b/acts/tests/google/power/bt/PowerBLEadvertiseTest.py new file mode 100644 index 0000000000..c12d2293e1 --- /dev/null +++ b/acts/tests/google/power/bt/PowerBLEadvertiseTest.py @@ -0,0 +1,63 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.test_utils.bt.BleEnum as bleenum +import acts.test_utils.bt.bt_power_test_utils as btputils +import acts.test_utils.power.PowerBTBaseTest as PBtBT + +BLE_LOCATION_SCAN_ENABLE = 'settings put secure location_mode 3' +EXTRA_ADV_TIME = 10 + + +class PowerBLEadvertiseTest(PBtBT.PowerBTBaseTest): + def __init__(self, configs): + super().__init__(configs) + req_params = ['adv_modes', 'adv_power_levels', 'adv_duration'] + self.unpack_userparams(req_params) + # Loop all advertise modes and power levels + for adv_mode in self.adv_modes: + for adv_power_level in self.adv_power_levels: + self.generate_test_case(adv_mode, adv_power_level, + self.adv_duration) + + def setup_class(self): + + super().setup_class() + self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE) + # Make sure during power measurement, advertisement is always on + self.mon_info.duration = ( + self.adv_duration - self.mon_offset - EXTRA_ADV_TIME) + + def generate_test_case(self, adv_mode, adv_power_level, adv_duration): + def test_case_fn(): + + self.measure_ble_advertise_power(adv_mode, adv_power_level, + adv_duration) + + adv_mode_str = bleenum.AdvertiseSettingsAdvertiseMode(adv_mode).name + adv_txpl_str = bleenum.AdvertiseSettingsAdvertiseTxPower( + adv_power_level).name.strip('ADVERTISE').strip('_') + test_case_name = ('test_BLE_{}_{}'.format(adv_mode_str, adv_txpl_str)) + setattr(self, test_case_name, test_case_fn) + + def measure_ble_advertise_power(self, adv_mode, adv_power_level, + adv_duration): + + btputils.start_apk_ble_adv(self.dut, adv_mode, adv_power_level, + adv_duration) + time.sleep(EXTRA_ADV_TIME) + self.measure_power_and_validate() diff --git a/acts/tests/google/power/bt/PowerBLEscanTest.py b/acts/tests/google/power/bt/PowerBLEscanTest.py new file mode 100644 index 0000000000..8ed77b5a94 --- /dev/null +++ b/acts/tests/google/power/bt/PowerBLEscanTest.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.test_utils.bt.BleEnum as bleenum +import acts.test_utils.bt.bt_power_test_utils as btputils +import acts.test_utils.power.PowerBTBaseTest as PBtBT + +BLE_LOCATION_SCAN_ENABLE = 'settings put secure location_mode 3' +EXTRA_SCAN_TIME = 10 + + +class PowerBLEscanTest(PBtBT.PowerBTBaseTest): + def __init__(self, configs): + super().__init__(configs) + req_params = ['scan_modes', 'scan_duration'] + self.unpack_userparams(req_params) + + for scan_mode in self.scan_modes: + self.generate_test_case_no_devices_around(scan_mode, + self.scan_duration) + + def setup_class(self): + + super().setup_class() + self.dut.adb.shell(BLE_LOCATION_SCAN_ENABLE) + # Make sure during power measurement, scan is always on + self.mon_info.duration = ( + self.scan_duration - self.mon_offset - EXTRA_SCAN_TIME) + + def generate_test_case_no_devices_around(self, scan_mode, scan_duration): + def test_case_fn(): + + self.measure_ble_scan_power(scan_mode, scan_duration) + + test_case_name = ('test_BLE_{}_no_advertisers'.format( + bleenum.ScanSettingsScanMode(scan_mode).name)) + setattr(self, test_case_name, test_case_fn) + + def measure_ble_scan_power(self, scan_mode, scan_duration): + + btputils.start_apk_ble_scan(self.dut, scan_mode, scan_duration) + time.sleep(EXTRA_SCAN_TIME) + self.measure_power_and_validate() diff --git a/acts/tests/google/power/bt/PowerBTa2dpTest.py b/acts/tests/google/power/bt/PowerBTa2dpTest.py new file mode 100644 index 0000000000..8122fc0748 --- /dev/null +++ b/acts/tests/google/power/bt/PowerBTa2dpTest.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.test_utils.bt.bt_test_utils as btutils +import acts.test_utils.power.PowerBTBaseTest as PBtBT +from acts import asserts +from acts.test_utils.bt import BtEnum + +EXTRA_PLAY_TIME = 10 + + +class PowerBTa2dpTest(PBtBT.PowerBTBaseTest): + def __init__(self, configs): + super().__init__(configs) + req_params = ['codecs', 'tx_power_levels', 'atten_pl_settings'] + self.unpack_userparams(req_params) + # Loop all codecs and tx power levels + for codec_config in self.codecs: + for tpl in self.tx_power_levels: + self.generate_test_case(codec_config, tpl) + + def setup_test(self): + super().setup_test() + btutils.connect_phone_to_headset(self.dut, self.bt_device, 60) + vol = self.dut.droid.getMaxMediaVolume() * self.volume + self.dut.droid.setMediaVolume(0) + time.sleep(1) + self.dut.droid.setMediaVolume(int(vol)) + + + def generate_test_case(self, codec_config, tpl): + def test_case_fn(): + self.measure_a2dp_power(codec_config, tpl) + + test_case_name = ('test_BTa2dp_{}_codec_at_PL{}'.format( + codec_config['codec_type'], tpl)) + setattr(self, test_case_name, test_case_fn) + + def measure_a2dp_power(self, codec_config, tpl): + + current_codec = self.dut.droid.bluetoothA2dpGetCurrentCodecConfig() + current_codec_type = BtEnum.BluetoothA2dpCodecType( + current_codec['codecType']).name + if current_codec_type != codec_config['codec_type']: + codec_set = btutils.set_bluetooth_codec(self.dut, **codec_config) + asserts.assert_true(codec_set, 'Codec configuration failed.') + else: + self.log.info('Current Codec is {}, no need to change'.format( + current_codec_type)) + + # Set attenuation so BT tx at desired power level + tpl = 'PL' + str(tpl) + self.set_attenuation(self.atten_pl_settings[tpl]) + self.log.info('Setting Attenuator to {} dB'.format(self.atten_pl_settings[tpl])) + + self.media.play() + self.log.info('Running A2DP with codec {} at {}'.format( + codec_config['codec_type'], tpl)) + self.dut.droid.goToSleepNow() + time.sleep(EXTRA_PLAY_TIME) + self.measure_power_and_validate()
\ No newline at end of file diff --git a/acts/tests/google/power/bt/PowerBTbaselineTest.py b/acts/tests/google/power/bt/PowerBTbaselineTest.py deleted file mode 100644 index f4050c8de5..0000000000 --- a/acts/tests/google/power/bt/PowerBTbaselineTest.py +++ /dev/null @@ -1,62 +0,0 @@ -#!/usr/bin/env python3.4 -# -# Copyright 2018 - The Android Open Source Project -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from acts.test_decorators import test_tracker_info -import acts.test_utils.power.PowerBTBaseTest as PBtBT - - -class PowerBTbaselineTest(PBtBT.PowerBTBaseTest): - def bt_baseline_test_func(self): - """Base function for BT baseline measurement. - - Steps: - 1. Sets the phone in airplane mode, disables gestures and location - 2. Turns ON/OFF BT, BLE and screen according to test conditions - 3. Measures the power consumption - 4. Asserts pass/fail criteria based on measured power - """ - - # Decode the test params from test name - attrs = ['screen_status', 'bt_status', 'ble_status', 'scan_status'] - indices = [2, 4, 6, 7] - self.decode_test_configs(attrs, indices) - # Setup the phoen at desired state - self.phone_setup_for_BT(self.test_configs.bt_status, - self.test_configs.ble_status, - self.test_configs.screen_status) - if self.test_configs.scan_status == 'connectable': - self.dut.droid.bluetoothMakeConnectable() - elif self.test_configs.scan_status == 'discoverable': - self.dut.droid.bluetoothMakeDiscoverable( - self.mon_info.duration + self.mon_info.offset) - self.measure_power_and_validate() - - # Test cases- Baseline - @test_tracker_info(uuid='3f8ac0cb-f20d-4569-a58e-6009c89ea049') - def test_screen_OFF_bt_ON_ble_ON_connectable(self): - self.bt_baseline_test_func() - - @test_tracker_info(uuid='d54a992e-37ed-460a-ada7-2c51941557fd') - def test_screen_OFF_bt_ON_ble_ON_discoverable(self): - self.bt_baseline_test_func() - - @test_tracker_info(uuid='8f4c36b5-b18e-4aa5-9fe5-aafb729c1034') - def test_screen_ON_bt_ON_ble_ON_connectable(self): - self.bt_baseline_test_func() - - @test_tracker_info(uuid='7128356f-67d8-46b3-9d6b-1a4c9a7a1745') - def test_screen_ON_bt_ON_ble_ON_discoverable(self): - self.bt_baseline_test_func() diff --git a/acts/tests/google/power/bt/PowerBTcalibrationTest.py b/acts/tests/google/power/bt/PowerBTcalibrationTest.py new file mode 100644 index 0000000000..dffcc67231 --- /dev/null +++ b/acts/tests/google/power/bt/PowerBTcalibrationTest.py @@ -0,0 +1,65 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import csv +import os +import time +import acts.test_utils.bt.bt_test_utils as btutils +import acts.test_utils.power.PowerBTBaseTest as PBtBT +from acts import utils + +EXTRA_PLAY_TIME = 10 + + +class PowerBTcalibrationTest(PBtBT.PowerBTBaseTest): + def setup_test(self): + + super().setup_test() + self.attenuator = self.attenuators[0] + btutils.enable_bqr(self.dut) + btutils.enable_bluetooth(self.dut.droid, self.dut.ed) + btutils.connect_phone_to_headset(self.dut, self.bt_device, 60) + vol = self.dut.droid.getMaxMediaVolume() * self.volume + self.dut.droid.setMediaVolume(int(vol)) + + self.cal_data_path = os.path.join(self.log_path, 'Calibration') + self.log_file = os.path.join(self.cal_data_path, 'Cal_data.csv') + utils.create_dir(os.path.dirname(self.log_file)) + + + def test_calibrate(self): + """Run calibration to get attenuation value at each power level + + """ + + self.cal_matrix = [] + self.media.play() + time.sleep(EXTRA_PLAY_TIME) + + # Loop through attenuation in 1 dB step until reaching at PL10 + for i in range(int(self.attenuator.get_max_atten())): + + self.attenuator.set_atten(i) + bt_metrics_dict = btutils.get_bt_metric(self.dut) + pwl = int(bt_metrics_dict['pwlv'][self.dut.serial]) + self.cal_matrix.append([i, pwl]) + if pwl == 10: + break + + # Write cal results to csv + with open(self.log_file, 'w', newline='') as f: + writer = csv.writer(f) + writer.writerows(self.cal_matrix) diff --git a/acts/tests/google/power/bt/PowerBTidleTest.py b/acts/tests/google/power/bt/PowerBTidleTest.py new file mode 100644 index 0000000000..bab79d0836 --- /dev/null +++ b/acts/tests/google/power/bt/PowerBTidleTest.py @@ -0,0 +1,59 @@ +#!/usr/bin/env python3.4 +# +# Copyright 2018 - The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time +import acts.test_utils.power.PowerBTBaseTest as PBtBT +import acts.test_utils.bt.bt_test_utils as btutils + +SCREEN_OFF_WAIT_TIME = 2 + + +class PowerBTidleTest(PBtBT.PowerBTBaseTest): + def setup_class(self): + + super().setup_class() + btutils.enable_bluetooth(self.dut.droid, self.dut.ed) + + # Test cases- Baseline + def test_bt_on_unconnected_connectable(self): + """BT turned on connectable mode. + + Page scan only. + """ + self.dut.droid.bluetoothMakeConnectable() + self.dut.droid.goToSleepNow() + time.sleep(SCREEN_OFF_WAIT_TIME) + self.measure_power_and_validate() + + def test_bt_on_unconnected_discoverable(self): + """BT turned on discoverable mode. + + Page and inquiry scan. + """ + self.dut.droid.bluetoothMakeConnectable() + self.dut.droid.bluetoothMakeDiscoverable() + self.dut.droid.goToSleepNow() + time.sleep(SCREEN_OFF_WAIT_TIME) + self.measure_power_and_validate() + + def test_bt_connected_idle(self): + """BT idle after connecting to headset. + + """ + btutils.connect_phone_to_headset(self.dut, self.bt_device, 60) + self.dut.droid.goToSleepNow() + time.sleep(SCREEN_OFF_WAIT_TIME) + self.measure_power_and_validate() diff --git a/acts/tests/google/power/bt/PowerBTscanTest.py b/acts/tests/google/power/bt/PowerBTscanTest.py deleted file mode 100644 index 0ef622c648..0000000000 --- a/acts/tests/google/power/bt/PowerBTscanTest.py +++ /dev/null @@ -1,85 +0,0 @@ -#!/usr/bin/env python3.4 -# -# Copyright 2018 - The Android Open Source Project -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import acts.test_utils.power.PowerBTBaseTest as PBtBT -from acts.test_decorators import test_tracker_info - - -class PowerBTscanTest(PBtBT.PowerBTBaseTest): - def ble_scan_base_func(self): - """Base function to start a generic BLE scan and measures the power - - Steps: - 1. Sets the phone in airplane mode, disables gestures and location - 2. Turns ON/OFF BT, BLE and screen according to test conditions - 3. Sends the adb shell command to PMC to start scan - 4. Measures the power consumption - 5. Asserts pass/fail criteria based on measured power - """ - # Decode the test params from test name - attrs = ['screen_status', 'bt_status', 'ble_status', 'scan_mode'] - indices = [2, 4, 6, -1] - self.decode_test_configs(attrs, indices) - if self.test_configs.scan_mode == 'lowpower': - scan_mode = 'low_power' - elif self.test_configs.scan_mode == 'lowlatency': - scan_mode = 'low_latency' - else: - scan_mode = self.test_configs.scan_mode - self.phone_setup_for_BT(self.test_configs.bt_status, - self.test_configs.ble_status, - self.test_configs.screen_status) - self.start_pmc_ble_scan(scan_mode, self.mon_info.offset, - self.mon_info.duration) - self.measure_power_and_validate() - - # Test Cases: BLE Scans + Filtered scans - @test_tracker_info(uuid='e9a36161-1d0c-4b9a-8bd8-80fef8cdfe28') - def test_screen_ON_bt_ON_ble_ON_default_scan_balanced(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='5fa61bf4-5f04-40bf-af52-6644b534d02e') - def test_screen_OFF_bt_ON_ble_ON_filter_scan_opportunistic(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='512b6cde-be83-43b0-b799-761380ba69ff') - def test_screen_OFF_bt_ON_ble_ON_filter_scan_lowpower(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='3a526838-ae7b-4cdb-bc29-89a5503d2306') - def test_screen_OFF_bt_ON_ble_ON_filter_scan_balanced(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='03a57cfd-4269-4a09-8544-84f878d2e801') - def test_screen_OFF_bt_ON_ble_ON_filter_scan_lowlatency(self): - self.ble_scan_base_func() - - # Test Cases: Background scans - @test_tracker_info(uuid='20145317-e362-4bfd-9860-4ceddf764784') - def test_screen_ON_bt_OFF_ble_ON_background_scan_lowlatency(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='00a53dc3-2c33-43c4-b356-dba93249b823') - def test_screen_ON_bt_OFF_ble_ON_background_scan_lowpower(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='b7185d64-631f-4b18-8d0b-4e14b80db375') - def test_screen_OFF_bt_OFF_ble_ON_background_scan_lowlatency(self): - self.ble_scan_base_func() - - @test_tracker_info(uuid='93eb05da-a577-409c-8208-6af1899a10c2') - def test_screen_OFF_bt_OFF_ble_ON_background_scan_lowpower(self): - self.ble_scan_base_func() diff --git a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py index 09b47a62aa..9c2ce9a2ef 100644 --- a/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py +++ b/acts/tests/google/power/gnss/PowerGnssDpoSimTest.py @@ -16,10 +16,12 @@ import acts.test_utils.power.PowerGnssBaseTest as GBT from acts.test_utils.gnss import dut_log_test_utils as diaglog +from acts.test_utils.gnss import gnss_test_utils as gutil import time import os from acts import utils -MDLOG_RUNNING_TIME = 120 +MDLOG_RUNNING_TIME = 300 +DUT_ACTION_WAIT_TIME = 2 class PowerGnssDpoSimTest(GBT.PowerGnssBaseTest): """Power baseline tests for rockbottom state. @@ -33,27 +35,35 @@ class PowerGnssDpoSimTest(GBT.PowerGnssBaseTest): Decode the test config from the test name, set device to desired state. Measure power and plot results. """ - self.collect_power_data() - self.pass_fail_check() + result = self.collect_power_data() + self.pass_fail_check(result.average_current) # Test cases def test_gnss_dpoOFF_measurement(self): utils.set_location_service(self.dut, True) + time.sleep(DUT_ACTION_WAIT_TIME) + gutil.start_gnss_by_gtw_gpstool(self.dut, state=True, type="gnss", bgdisplay=True) self.dut.send_keycode("SLEEP") + time.sleep(DUT_ACTION_WAIT_TIME) self.measure_gnsspower_test_func() diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile) self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME) qxdm_log_path = os.path.join(self.log_path, 'QXDM') diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path) + gutil.start_gnss_by_gtw_gpstool(self.dut, state=False) def test_gnss_dpoON_measurement(self): utils.set_location_service(self.dut, True) + time.sleep(DUT_ACTION_WAIT_TIME) + gutil.start_gnss_by_gtw_gpstool(self.dut, state=True, type="gnss", bgdisplay=True) self.dut.send_keycode("SLEEP") + time.sleep(DUT_ACTION_WAIT_TIME) self.measure_gnsspower_test_func() diaglog.start_diagmdlog_background(self.dut, maskfile=self.maskfile) self.disconnect_usb(self.dut, MDLOG_RUNNING_TIME) qxdm_log_path = os.path.join(self.log_path, 'QXDM') diaglog.stop_background_diagmdlog(self.dut, qxdm_log_path) + gutil.start_gnss_by_gtw_gpstool(self.dut, state=False) def test_gnss_rockbottom(self): self.dut.send_keycode("SLEEP") diff --git a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py index 66aa45815c..e5aabb76f9 100644 --- a/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py +++ b/acts/tests/google/power/tel/lab/PowerTelHotspotTest.py @@ -119,7 +119,7 @@ class PowerTelHotspotTest(PowerTelTrafficTest): iperf_helpers = self.start_tel_traffic(self.android_devices[1]) # Measure power - self.collect_power_data() + result = self.collect_power_data() # Wait for iPerf to finish time.sleep(self.IPERF_MARGIN + 2) @@ -129,7 +129,7 @@ class PowerTelHotspotTest(PowerTelTrafficTest): iperf_helpers) # Checks if power is below the required threshold. - self.pass_fail_check() + self.pass_fail_check(result.average_current) def setup_test(self): """ Executed before every test case. diff --git a/acts/tests/google/power/tel/lab/PowerTelIdleTest.py b/acts/tests/google/power/tel/lab/PowerTelIdleTest.py index 934fe20dc0..f1197988c6 100644 --- a/acts/tests/google/power/tel/lab/PowerTelIdleTest.py +++ b/acts/tests/google/power/tel/lab/PowerTelIdleTest.py @@ -33,7 +33,7 @@ class PowerTelIdleTest(PWCEL.PowerCellularLabBaseTest): self.cellular_simulator.wait_until_idle_state(idle_wait_time) # Measure power - self.collect_power_data() + result = self.collect_power_data() # Check if power measurement is below the required value - self.pass_fail_check() + self.pass_fail_check(result.average_current) diff --git a/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py b/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py index 0abee5050a..b373fdb3d0 100644 --- a/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py +++ b/acts/tests/google/power/tel/lab/PowerTelTrafficTest.py @@ -158,7 +158,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest): iperf_helpers = self.start_tel_traffic(self.dut) # Measure power - self.collect_power_data() + result = self.collect_power_data() # Wait for iPerf to finish time.sleep(self.IPERF_MARGIN + 2) @@ -167,9 +167,9 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest): self.iperf_results = self.get_iperf_results(self.dut, iperf_helpers) # Check if power measurement is below the required value - self.pass_fail_check() + self.pass_fail_check(result.average_current) - return self.test_result, self.iperf_results + return result.average_current, self.iperf_results def get_iperf_results(self, device, iperf_helpers): """ Pulls iperf results from the device. @@ -196,7 +196,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest): return throughput - def pass_fail_check(self): + def pass_fail_check(self, average_current=None): """ Checks power consumption and throughput. Uses the base class method to check power consumption. Also, compares @@ -230,7 +230,7 @@ class PowerTelTrafficTest(PWCEL.PowerCellularLabBaseTest): direction, round(throughput, 3), round(expected_t, 3), round(throughput / expected_t, 3))) - super().pass_fail_check() + super().pass_fail_check(average_current) def start_tel_traffic(self, client_host): """ Starts iPerf in the indicated device and initiates traffic. diff --git a/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py b/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py index f0d0eda6ce..93b69c6e2b 100644 --- a/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py +++ b/acts/tests/google/power/tel/lab/PowerTelVoiceCallTest.py @@ -70,10 +70,10 @@ class PowerTelVoiceCallTest(PWCEL.PowerCellularLabBaseTest): self.dut.droid.goToSleepNow() # Measure power - self.collect_power_data() + result = self.collect_power_data() # End the call hangup_call(self.log, self.dut) # Check if power measurement is within the required values - self.pass_fail_check() + self.pass_fail_check(result.average_current) diff --git a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py index f554d12bde..57e926376a 100644 --- a/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py +++ b/acts/tests/google/power/wifi/PowerWiFiHotspotTest.py @@ -158,7 +158,7 @@ class PowerWiFiHotspotTest(PWBT.PowerWiFiBaseTest): time.sleep(2) # Measure power - self.collect_power_data() + result = self.collect_power_data() if traffic: # Wait for iperf to finish @@ -168,7 +168,7 @@ class PowerWiFiHotspotTest(PWBT.PowerWiFiBaseTest): self.client_iperf_helper.process_iperf_results( self.dut, self.log, self.iperf_servers, self.test_name) - self.pass_fail_check() + self.pass_fail_check(result.average_current) def power_idle_tethering_test(self): """ Start power test when Hotspot is idle diff --git a/acts/tests/google/power/wifi/PowerWiFimulticastTest.py b/acts/tests/google/power/wifi/PowerWiFimulticastTest.py index 5a6108cdfb..8832469980 100644 --- a/acts/tests/google/power/wifi/PowerWiFimulticastTest.py +++ b/acts/tests/google/power/wifi/PowerWiFimulticastTest.py @@ -27,6 +27,22 @@ DNS_SHORT_LIFETIME = 3 class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): + def setup_class(self): + super().setup_class() + self.unpack_userparams(sub_mask="255.255.255.0", + mac_dst="get_from_dut", + mac_src="get_local", + ipv4_dst="get_from_dut", + ipv4_src="get_local", + ipv6_dst="get_from_dut", + ipv6_src="get_local", + ipv6_src_type="LINK_LOCAL", + ipv4_gwt="192.168.1.1", + mac_dst_fake="40:90:28:EF:4B:20", + ipv4_dst_fake="192.168.1.60", + ipv6_dst_fake="fe80::300f:40ee:ee0a:5000", + interval=1) + def set_connection(self): """Setup connection between AP and client. @@ -38,8 +54,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): indices = [2, 4] self.decode_test_configs(attrs, indices) # Change DTIMx1 on the phone to receive all Multicast packets - rebooted = wputils.change_dtim( - self.dut, gEnableModulatedDTIM=1, gMaxLIModulatedDTIM=10) + rebooted = wputils.change_dtim(self.dut, + gEnableModulatedDTIM=1, + gMaxLIModulatedDTIM=10) self.dut.log.info('DTIM value of the phone is now DTIMx1') if rebooted: self.dut_rockbottom() @@ -87,8 +104,8 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - ip_dst='0.0.0.0', eth_dst=self.pkt_gen_config['dst_mac']) + packet = pkt_gen.generate(ip_dst='0.0.0.0', + eth_dst=self.pkt_gen_config['dst_mac']) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='5dcb16f1-725c-45de-8103-340104d60a22') @@ -96,8 +113,8 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - ip_dst=self.ipv4_dst_fake, eth_dst=self.pkt_gen_config['dst_mac']) + packet = pkt_gen.generate(ip_dst=self.ipv4_dst_fake, + eth_dst=self.pkt_gen_config['dst_mac']) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='5ec4800f-a82e-4462-8b65-4fcd0b1940a2') @@ -105,12 +122,11 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - op='is-at', - ip_src='0.0.0.0', - ip_dst=self.ipv4_dst_fake, - hwdst=self.mac_dst_fake, - eth_dst=self.pkt_gen_config['dst_mac']) + packet = pkt_gen.generate(op='is-at', + ip_src='0.0.0.0', + ip_dst=self.ipv4_dst_fake, + hwdst=self.mac_dst_fake, + eth_dst=self.pkt_gen_config['dst_mac']) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='6c5c0e9e-7a00-43d0-a6e8-355141467703') @@ -118,11 +134,10 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.ArpGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - op='is-at', - ip_dst=self.ipv4_dst_fake, - hwdst=self.mac_dst_fake, - eth_dst=self.pkt_gen_config['dst_mac']) + packet = pkt_gen.generate(op='is-at', + ip_dst=self.ipv4_dst_fake, + hwdst=self.mac_dst_fake, + eth_dst=self.pkt_gen_config['dst_mac']) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='8e534d3b-5a25-429a-a1bb-8119d7d28b5a') @@ -178,8 +193,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME) + packet = pkt_gen.generate(RA_LONG_LIFETIME, + enableDNS=True, + dns_lifetime=DNS_SHORT_LIFETIME) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='84d2f1ff-bd4f-46c6-9b06-826d9b14909c') @@ -187,8 +203,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME) + packet = pkt_gen.generate(RA_LONG_LIFETIME, + enableDNS=True, + dns_lifetime=DNS_LONG_LIFETIME) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='4a17e74f-3e7f-4e90-ac9e-884a7c13cede') @@ -333,8 +350,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_SHORT_LIFETIME) + packet = pkt_gen.generate(RA_LONG_LIFETIME, + enableDNS=True, + dns_lifetime=DNS_SHORT_LIFETIME) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='62b99cd7-75bf-45be-b93f-bb037a13b3e2') @@ -342,8 +360,9 @@ class PowerWiFimulticastTest(PWBT.PowerWiFiBaseTest): self.set_connection() self.pkt_gen_config = wputils.create_pkt_config(self) pkt_gen = pkt_utils.RaGenerator(**self.pkt_gen_config) - packet = pkt_gen.generate( - RA_LONG_LIFETIME, enableDNS=True, dns_lifetime=DNS_LONG_LIFETIME) + packet = pkt_gen.generate(RA_LONG_LIFETIME, + enableDNS=True, + dns_lifetime=DNS_LONG_LIFETIME) self.sendPacketAndMeasure(packet) @test_tracker_info(uuid='4088af4c-a64b-4fc1-848c-688936cc6c12') diff --git a/acts/tests/google/power/wifi/PowerWiFiroamingTest.py b/acts/tests/google/power/wifi/PowerWiFiroamingTest.py index 98409895f4..66110a1f34 100644 --- a/acts/tests/google/power/wifi/PowerWiFiroamingTest.py +++ b/acts/tests/google/power/wifi/PowerWiFiroamingTest.py @@ -81,22 +81,30 @@ class PowerWiFiroamingTest(PWBT.PowerWiFiBaseTest): time.sleep(5) # Toggle between two networks begin_time = utils.get_current_epoch_time() + results = [] for i in range(self.toggle_times): self.dut.log.info('Connecting to %s' % network_main[wc.SSID]) self.dut.droid.wifiConnect(network_main) - file_path, avg_current = self.monsoon_data_collect_save() + results.append(self.monsoon_data_collect_save()) self.dut.log.info('Connecting to %s' % network_aux[wc.SSID]) self.dut.droid.wifiConnect(network_aux) - file_path, avg_current = self.monsoon_data_collect_save() - [plot, dt] = wputils.monsoon_data_plot(self.mon_info, file_path) - self.test_result = dt.source.data['y0'][0] - self.power_result.metric_value = ( - self.test_result * PHONE_BATTERY_VOLTAGE) + results.append(self.monsoon_data_collect_save()) + wputils.monsoon_data_plot(self.mon_info, results) + + total_current = 0 + total_samples = 0 + for result in results: + total_current += result.average_current * result.num_samples + total_samples += result.num_samples + average_current = total_current / total_samples + + self.power_result.metric_value = [result.total_power for result in + results] # Take Bugreport if self.bug_report: self.dut.take_bug_report(self.test_name, begin_time) # Path fail check - self.pass_fail_check() + self.pass_fail_check(average_current) @test_tracker_info(uuid='e5ff95c0-b17e-425c-a903-821ba555a9b9') def test_screenon_toggle_between_AP(self): @@ -119,22 +127,30 @@ class PowerWiFiroamingTest(PWBT.PowerWiFiBaseTest): time.sleep(5) # Toggle between two networks begin_time = utils.get_current_epoch_time() + results = [] for i in range(self.toggle_times): self.dut.log.info('Connecting to %s' % network_main[wc.SSID]) self.dut.droid.wifiConnect(network_main) - file_path, avg_current = self.monsoon_data_collect_save() + results.append(self.monsoon_data_collect_save()) self.dut.log.info('Connecting to %s' % network_aux[wc.SSID]) self.dut.droid.wifiConnect(network_aux) - file_path, avg_current = self.monsoon_data_collect_save() - [plot, dt] = wputils.monsoon_data_plot(self.mon_info, file_path) - self.test_result = dt.source.data['y0'][0] - self.power_result.metric_value = ( - self.test_result * PHONE_BATTERY_VOLTAGE) + results.append(self.monsoon_data_collect_save()) + wputils.monsoon_data_plot(self.mon_info, results) + + total_current = 0 + total_samples = 0 + for result in results: + total_current += result.average_current * result.num_samples + total_samples += result.num_samples + average_current = total_current / total_samples + + self.power_result.metric_value = [result.total_power for result in + results] # Take Bugreport if self.bug_report: self.dut.take_bug_report(self.test_name, begin_time) # Path fail check - self.pass_fail_check() + self.pass_fail_check(average_current) @test_tracker_info(uuid='a16ae337-326f-4d09-990f-42232c3c0dc4') def test_screenoff_wifi_wedge(self): diff --git a/acts/tests/google/tel/live/TelLiveStressDataTest.py b/acts/tests/google/tel/live/TelLiveStressDataTest.py new file mode 100644 index 0000000000..b8012aeca2 --- /dev/null +++ b/acts/tests/google/tel/live/TelLiveStressDataTest.py @@ -0,0 +1,77 @@ +#!/usr/bin/env python3 +# +# Copyright 2019 - Google +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + Test Script for Telephony Stress data Test +""" +from acts.test_decorators import test_tracker_info +from acts.test_utils.tel.TelephonyBaseTest import TelephonyBaseTest +from acts.test_utils.tel.tel_test_utils import iperf_test_by_adb +from acts.test_utils.tel.tel_test_utils import iperf_udp_test_by_adb + + +class TelLiveStressDataTest(TelephonyBaseTest): + def setup_class(self): + super().setup_class() + self.ad = self.android_devices[0] + self.iperf_server_address = self.user_params.get("iperf_server", + '0.0.0.0') + self.iperf_srv_tcp_port = self.user_params.get("iperf_server_tcp_port", + 0) + self.iperf_srv_udp_port = self.user_params.get("iperf_server_udp_port", + 0) + self.test_duration = self.user_params.get("data_stress_duration", 60) + + return True + + @test_tracker_info(uuid="190fdeb1-541e-455f-9f37-762a8e55c07f") + @TelephonyBaseTest.tel_test_wrap + def test_tcp_upload_stress(self): + return iperf_test_by_adb(self.log, + self.ad, + self.iperf_server_address, + self.iperf_srv_tcp_port, + False, + self.test_duration) + + @test_tracker_info(uuid="af9805f8-6ed5-4e05-823e-d88dcef45637") + @TelephonyBaseTest.tel_test_wrap + def test_tcp_download_stress(self): + return iperf_test_by_adb(self.log, + self.ad, + self.iperf_server_address, + self.iperf_srv_tcp_port, + True, + self.test_duration) + + @test_tracker_info(uuid="55bf5e09-dc7b-40bc-843f-31fed076ffe4") + @TelephonyBaseTest.tel_test_wrap + def test_udp_upload_stress(self): + return iperf_udp_test_by_adb(self.log, + self.ad, + self.iperf_server_address, + self.iperf_srv_udp_port, + False, + self.test_duration) + + @test_tracker_info(uuid="02ae88b2-d597-45df-ab5a-d701d1125a0f") + @TelephonyBaseTest.tel_test_wrap + def test_udp_download_stress(self): + return iperf_udp_test_by_adb(self.log, + self.ad, + self.iperf_server_address, + self.iperf_srv_udp_port, + True, + self.test_duration) diff --git a/acts/tests/google/tel/live/TelLiveVoiceTest.py b/acts/tests/google/tel/live/TelLiveVoiceTest.py index fc6aabad2d..5114daa41c 100644 --- a/acts/tests/google/tel/live/TelLiveVoiceTest.py +++ b/acts/tests/google/tel/live/TelLiveVoiceTest.py @@ -56,6 +56,7 @@ from acts.test_utils.tel.tel_test_utils import get_mobile_data_usage from acts.test_utils.tel.tel_test_utils import hangup_call from acts.test_utils.tel.tel_test_utils import initiate_call from acts.test_utils.tel.tel_test_utils import is_phone_in_call_active +from acts.test_utils.tel.tel_test_utils import is_phone_in_call from acts.test_utils.tel.tel_test_utils import multithread_func from acts.test_utils.tel.tel_test_utils import num_active_calls from acts.test_utils.tel.tel_test_utils import remove_mobile_data_usage_limit @@ -68,6 +69,8 @@ from acts.test_utils.tel.tel_test_utils import wait_for_ringing_call from acts.test_utils.tel.tel_test_utils import wait_for_state from acts.test_utils.tel.tel_test_utils import start_youtube_video from acts.test_utils.tel.tel_test_utils import set_wifi_to_default +from acts.test_utils.tel.tel_test_utils import STORY_LINE +from acts.test_utils.tel.tel_test_utils import wait_for_in_call_active from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_1x from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_2g from acts.test_utils.tel.tel_voice_utils import is_phone_in_call_3g @@ -113,6 +116,45 @@ class TelLiveVoiceTest(TelephonyBaseTest): """ Tests Begin """ @TelephonyBaseTest.tel_test_wrap + @test_tracker_info(uuid="c5009f8c-eb1d-4cd9-85ce-604298bbeb3e") + def test_call_to_answering_machine(self): + """ Voice call to an answering machine. + + 1. Make Sure PhoneA attached to voice network. + 2. Call from PhoneA to Storyline + 3. Verify call is in ACTIVE state + 4. Hangup Call from PhoneA + + Raises: + TestFailure if not success. + """ + ad = self.android_devices[0] + + if not phone_setup_voice_general(ad.log, ad): + ad.log.error("Phone Failed to Set Up Properly for Voice.") + return False + for iteration in range(3): + result = True + ad.log.info("Attempt %d", iteration + 1) + if not initiate_call(ad.log, ad, STORY_LINE) and \ + wait_for_in_call_active(ad, 60, 3): + ad.log.error("Call Failed to Initiate") + result = False + time.sleep(WAIT_TIME_IN_CALL) + if not is_phone_in_call(ad.log, ad): + ad.log.error("Call Dropped") + result = False + if not hangup_call(ad.log, ad): + ad.log.error("Call Failed to Hangup") + result = False + if result: + ad.log.info("Call test PASS in iteration %d", iteration + 1) + return True + ad.log.info("Call test FAIL in all 3 iterations") + return False + + + @TelephonyBaseTest.tel_test_wrap @test_tracker_info(uuid="fca3f9e1-447a-416f-9a9c-50b7161981bf") def test_call_mo_voice_general(self): """ General voice to voice call. diff --git a/acts/tests/google/wifi/WifiSensitivityTest.py b/acts/tests/google/wifi/WifiSensitivityTest.py index 19476843cd..a982cc6972 100644 --- a/acts/tests/google/wifi/WifiSensitivityTest.py +++ b/acts/tests/google/wifi/WifiSensitivityTest.py @@ -194,7 +194,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest): Args: result: dict containing attenuation, throughput and other meta - data + data """ try: golden_path = next(file_name @@ -207,7 +207,7 @@ class WifiSensitivityTest(WifiRvrTest, WifiPingTest): except: golden_sensitivity = float('nan') - result_string = ('Througput = {}%, Sensitivity = {}.' + result_string = ('Throughput = {}%, Sensitivity = {}.' 'Target Sensitivity = {}'.format( result['peak_throughput_pct'], result['sensitivity'], golden_sensitivity)) |