diff options
author | Tom Turney <tturney@google.com> | 2017-05-31 02:06:51 +0000 |
---|---|---|
committer | Android (Google) Code Review <android-gerrit@google.com> | 2017-05-31 02:06:52 +0000 |
commit | 9d5ddce1d2044f3a8852ccfcaf5c4b38a50f5105 (patch) | |
tree | 15703004b9cfdead72acf548fb509d6743e12de4 | |
parent | c21561d7e106b1ca5e28010e16f4bac4f6ecffaa (diff) | |
parent | ddaa77174ea19dce8dd0be88d323fc86e71532b9 (diff) | |
download | platform_tools_test_connectivity-9d5ddce1d2044f3a8852ccfcaf5c4b38a50f5105.tar.gz platform_tools_test_connectivity-9d5ddce1d2044f3a8852ccfcaf5c4b38a50f5105.tar.bz2 platform_tools_test_connectivity-9d5ddce1d2044f3a8852ccfcaf5c4b38a50f5105.zip |
Merge "PTS Cmd Line Tool for GATT Client" into oc-dev
-rw-r--r-- | acts/framework/acts/test_utils/bt/GattEnum.py | 174 | ||||
-rw-r--r-- | acts/framework/acts/test_utils/bt/bt_gatt_utils.py | 168 | ||||
-rw-r--r-- | acts/tests/google/bt/pts/cmd_input.py | 272 | ||||
-rw-r--r-- | acts/tests/google/bt/pts/gattc_lib.py | 448 |
4 files changed, 953 insertions, 109 deletions
diff --git a/acts/framework/acts/test_utils/bt/GattEnum.py b/acts/framework/acts/test_utils/bt/GattEnum.py index ec353273d0..152a765f63 100644 --- a/acts/framework/acts/test_utils/bt/GattEnum.py +++ b/acts/framework/acts/test_utils/bt/GattEnum.py @@ -17,9 +17,9 @@ from enum import Enum from enum import IntEnum + class GattCbErr(Enum): CHAR_WRITE_REQ_ERR = "Characteristic Write Request event not found. Expected {}" - CHAR_EXEC_WRITE_ERR = "Characteristic Execute Write event not found. Expected {}" CHAR_WRITE_ERR = "Characteristic Write event not found. Expected {}" DESC_WRITE_REQ_ERR = "Descriptor Write Request event not found. Expected {}" DESC_WRITE_ERR = "Descriptor Write event not found. Expected {}" @@ -36,10 +36,12 @@ class GattCbErr(Enum): CHAR_CHANGE_ERR = "GATT Characteristic Changed event not fond. Expected {}" PHY_READ_ERR = "Phy Read event not fond. Expected {}" PHY_UPDATE_ERR = "Phy Update event not fond. Expected {}" + EXEC_WRITE_ERR = "GATT Execute Write event not found. Expected {}" + class GattCbStrings(Enum): CHAR_WRITE_REQ = "GattServer{}onCharacteristicWriteRequest" - CHAR_EXEC_WRITE = "GattServer{}onExecuteWrite" + EXEC_WRITE = "GattServer{}onExecuteWrite" CHAR_WRITE = "GattConnect{}onCharacteristicWrite" DESC_WRITE_REQ = "GattServer{}onDescriptorWriteRequest" DESC_WRITE = "GattConnect{}onDescriptorWrite" @@ -61,44 +63,82 @@ class GattCbStrings(Enum): class GattEvent(Enum): - CHAR_WRITE_REQ = {"evt": GattCbStrings.CHAR_WRITE_REQ.value, - "err": GattCbErr.CHAR_WRITE_REQ_ERR.value} - CHAR_EXEC_WRITE = {"evt": GattCbStrings.CHAR_EXEC_WRITE.value, - "err": GattCbErr.CHAR_EXEC_WRITE_ERR.value} - CHAR_WRITE = {"evt": GattCbStrings.CHAR_WRITE.value, - "err": GattCbErr.CHAR_WRITE_ERR.value} - DESC_WRITE_REQ = {"evt": GattCbStrings.DESC_WRITE_REQ.value, - "err": GattCbErr.DESC_WRITE_REQ_ERR.value} - DESC_WRITE = {"evt": GattCbStrings.DESC_WRITE.value, - "err": GattCbErr.DESC_WRITE_ERR.value} - CHAR_READ = {"evt": GattCbStrings.CHAR_READ.value, - "err": GattCbErr.CHAR_READ_ERR.value} - CHAR_READ_REQ = {"evt": GattCbStrings.CHAR_READ_REQ.value, - "err": GattCbErr.CHAR_READ_REQ_ERR.value} - DESC_READ = {"evt": GattCbStrings.DESC_READ.value, - "err": GattCbErr.DESC_READ_ERR.value} - DESC_READ_REQ = {"evt": GattCbStrings.DESC_READ_REQ.value, - "err": GattCbErr.DESC_READ_REQ_ERR.value} - RD_REMOTE_RSSI = {"evt": GattCbStrings.RD_REMOTE_RSSI.value, - "err": GattCbErr.RD_REMOTE_RSSI_ERR.value} - GATT_SERV_DISC = {"evt": GattCbStrings.GATT_SERV_DISC.value, - "err": GattCbErr.GATT_SERV_DISC_ERR.value} - SERV_ADDED = {"evt": GattCbStrings.SERV_ADDED.value, - "err": GattCbErr.SERV_ADDED_ERR.value} - MTU_CHANGED = {"evt": GattCbStrings.MTU_CHANGED.value, - "err": GattCbErr.MTU_CHANGED_ERR.value} - GATT_CONN_CHANGE = {"evt": GattCbStrings.GATT_CONN_CHANGE.value, - "err": GattCbErr.GATT_CONN_CHANGE_ERR.value} - CHAR_CHANGE = {"evt": GattCbStrings.CHAR_CHANGE.value, - "err": GattCbErr.CHAR_CHANGE_ERR.value} - PHY_READ = {"evt": GattCbStrings.PHY_READ.value, - "err": GattCbErr.PHY_READ_ERR.value} - PHY_UPDATE = {"evt": GattCbStrings.PHY_UPDATE.value, - "err": GattCbErr.PHY_UPDATE_ERR.value} - SERV_PHY_READ = {"evt": GattCbStrings.SERV_PHY_READ.value, - "err": GattCbErr.PHY_READ_ERR.value} - SERV_PHY_UPDATE = {"evt": GattCbStrings.SERV_PHY_UPDATE.value, - "err": GattCbErr.PHY_UPDATE_ERR.value} + CHAR_WRITE_REQ = { + "evt": GattCbStrings.CHAR_WRITE_REQ.value, + "err": GattCbErr.CHAR_WRITE_REQ_ERR.value + } + EXEC_WRITE = { + "evt": GattCbStrings.EXEC_WRITE.value, + "err": GattCbErr.EXEC_WRITE_ERR.value + } + CHAR_WRITE = { + "evt": GattCbStrings.CHAR_WRITE.value, + "err": GattCbErr.CHAR_WRITE_ERR.value + } + DESC_WRITE_REQ = { + "evt": GattCbStrings.DESC_WRITE_REQ.value, + "err": GattCbErr.DESC_WRITE_REQ_ERR.value + } + DESC_WRITE = { + "evt": GattCbStrings.DESC_WRITE.value, + "err": GattCbErr.DESC_WRITE_ERR.value + } + CHAR_READ = { + "evt": GattCbStrings.CHAR_READ.value, + "err": GattCbErr.CHAR_READ_ERR.value + } + CHAR_READ_REQ = { + "evt": GattCbStrings.CHAR_READ_REQ.value, + "err": GattCbErr.CHAR_READ_REQ_ERR.value + } + DESC_READ = { + "evt": GattCbStrings.DESC_READ.value, + "err": GattCbErr.DESC_READ_ERR.value + } + DESC_READ_REQ = { + "evt": GattCbStrings.DESC_READ_REQ.value, + "err": GattCbErr.DESC_READ_REQ_ERR.value + } + RD_REMOTE_RSSI = { + "evt": GattCbStrings.RD_REMOTE_RSSI.value, + "err": GattCbErr.RD_REMOTE_RSSI_ERR.value + } + GATT_SERV_DISC = { + "evt": GattCbStrings.GATT_SERV_DISC.value, + "err": GattCbErr.GATT_SERV_DISC_ERR.value + } + SERV_ADDED = { + "evt": GattCbStrings.SERV_ADDED.value, + "err": GattCbErr.SERV_ADDED_ERR.value + } + MTU_CHANGED = { + "evt": GattCbStrings.MTU_CHANGED.value, + "err": GattCbErr.MTU_CHANGED_ERR.value + } + GATT_CONN_CHANGE = { + "evt": GattCbStrings.GATT_CONN_CHANGE.value, + "err": GattCbErr.GATT_CONN_CHANGE_ERR.value + } + CHAR_CHANGE = { + "evt": GattCbStrings.CHAR_CHANGE.value, + "err": GattCbErr.CHAR_CHANGE_ERR.value + } + PHY_READ = { + "evt": GattCbStrings.PHY_READ.value, + "err": GattCbErr.PHY_READ_ERR.value + } + PHY_UPDATE = { + "evt": GattCbStrings.PHY_UPDATE.value, + "err": GattCbErr.PHY_UPDATE_ERR.value + } + SERV_PHY_READ = { + "evt": GattCbStrings.SERV_PHY_READ.value, + "err": GattCbErr.PHY_READ_ERR.value + } + SERV_PHY_UPDATE = { + "evt": GattCbStrings.SERV_PHY_UPDATE.value, + "err": GattCbErr.PHY_UPDATE_ERR.value + } class GattConnectionState(IntEnum): @@ -152,6 +192,52 @@ class GattDescriptor(Enum): PERMISSION_WRITE_SIGNED_MITM = 0x100 +class GattCharDesc(Enum): + GATT_CHARAC_EXT_PROPER_UUID = '00002900-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_USER_DESC_UUID = '00002901-0000-1000-8000-00805f9b34fb' + GATT_CLIENT_CHARAC_CFG_UUID = '00002902-0000-1000-8000-00805f9b34fb' + GATT_SERVER_CHARAC_CFG_UUID = '00002903-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_FMT_UUID = '00002904-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_AGREG_FMT_UUID = '00002905-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_VALID_RANGE_UUID = '00002906-0000-1000-8000-00805f9b34fb' + GATT_EXTERNAL_REPORT_REFERENCE = '00002907-0000-1000-8000-00805f9b34fb' + GATT_REPORT_REFERENCE = '00002908-0000-1000-8000-00805f9b34fb' + + +class GattCharTypes(Enum): + GATT_CHARAC_DEVICE_NAME = '00002a00-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_APPEARANCE = '00002a01-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_PERIPHERAL_PRIV_FLAG = '00002a02-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_RECONNECTION_ADDRESS = '00002a03-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_PERIPHERAL_PREF_CONN = '00002a04-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_SERVICE_CHANGED = '00002a05-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_SYSTEM_ID = '00002a23-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_MODEL_NUMBER_STRING = '00002a24-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_SERIAL_NUMBER_STRING = '00002a25-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_FIRMWARE_REVISION_STRING = '00002a26-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_HARDWARE_REVISION_STRING = '00002a27-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_SOFTWARE_REVISION_STRING = '00002a28-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_MANUFACTURER_NAME_STRING = '00002a29-0000-1000-8000-00805f9b34fb' + GATT_CHARAC_PNP_ID = '00002a50-0000-1000-8000-00805f9b34fb' + + +class GattCharacteristicAttrLength(Enum): + MTU_ATTR_1 = 1 + MTU_ATTR_2 = 3 + MTU_ATTR_3 = 15 + + +class CharacteristicValueFormat(Enum): + STRING = 0x1 + BYTE = 0x2 + FORMAT_SINT8 = 0x21 + FORMAT_UINT8 = 0x11 + FORMAT_SINT16 = 0x22 + FORMAT_UINT16 = 0x12 + FORMAT_SINT32 = 0x24 + FORMAT_UINT32 = 0x14 + + class GattService(IntEnum): SERVICE_TYPE_PRIMARY = 0 SERVICE_TYPE_SECONDARY = 1 @@ -169,9 +255,9 @@ class MtuSize(IntEnum): class GattCharacteristicAttrLength(IntEnum): - MTU_ATTR_1 = 1 - MTU_ATTR_2 = 3 - MTU_ATTR_3 = 15 + MTU_ATTR_1 = 1 + MTU_ATTR_2 = 3 + MTU_ATTR_3 = 15 class BluetoothGatt(Enum): @@ -194,4 +280,4 @@ class GattPhy(IntEnum): class GattPhyMask(IntEnum): PHY_LE_1M_MASK = 1 PHY_LE_2M_MASK = 2 - PHY_LE_CODED_MASK = 4
\ No newline at end of file + PHY_LE_CODED_MASK = 4 diff --git a/acts/framework/acts/test_utils/bt/bt_gatt_utils.py b/acts/framework/acts/test_utils/bt/bt_gatt_utils.py index 79bf2cb87c..da0eb6f328 100644 --- a/acts/framework/acts/test_utils/bt/bt_gatt_utils.py +++ b/acts/framework/acts/test_utils/bt/bt_gatt_utils.py @@ -54,17 +54,17 @@ def setup_gatt_connection(cen_ad, cen_ad.droid.gattClientClose(bluetooth_gatt) except Exception: self.log.debug("Failed to close gatt client.") - raise GattTestUtilsError("Could not establish a connection to " - "peripheral. Expected event:".format( - expected_event)) + raise GattTestUtilsError( + "Could not establish a connection to " + "peripheral. Expected event: {}".format(expected_event)) if event['data']['State'] != GattConnectionState.STATE_CONNECTED.value: try: cen_ad.droid.gattClientClose(bluetooth_gatt) except Exception: self.log.debug("Failed to close gatt client.") - raise GattTestUtilsError("Could not establish a connection to " - "peripheral. Event Details:".format( - pprint.pformat(event))) + raise GattTestUtilsError( + "Could not establish a connection to " + "peripheral. Event Details: {}".format(pprint.pformat(event))) return bluetooth_gatt, gatt_callback @@ -130,7 +130,8 @@ def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed, discovered_services_index, i, characteristic)) log.info(descriptor_uuids) for descriptor in descriptor_uuids: - log.info("descriptor to be written {}".format(descriptor)) + log.info( + "descriptor to be written {}".format(descriptor)) cen_droid.gattClientDescriptorSetValue( bluetooth_gatt, discovered_services_index, i, characteristic, descriptor, test_value) @@ -150,11 +151,12 @@ def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed, request_id = event['data']['requestId'] found_value = event['data']['value'] if found_value != test_value: - log.error("Values didn't match. Found: {}, Expected: " - "{}".format(found_value, test_value)) - per_droid.gattServerSendResponse(gatt_server, bt_device_id, - request_id, status, - offset, test_value_return) + log.error( + "Values didn't match. Found: {}, Expected: " + "{}".format(found_value, test_value)) + per_droid.gattServerSendResponse( + gatt_server, bt_device_id, request_id, status, + offset, test_value_return) expected_event = GattCbStrings.DESC_WRITE.value.format( bluetooth_gatt) try: @@ -171,35 +173,46 @@ def run_continuous_write_descriptor(cen_droid, cen_ed, per_droid, per_ed, def setup_characteristics_and_descriptors(droid): characteristic_input = [ { - 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", - 'property': GattCharacteristic.PROPERTY_WRITE.value | + 'uuid': + "aa7edd5a-4d1d-4f0e-883a-d145616a1630", + 'property': + GattCharacteristic.PROPERTY_WRITE.value | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, - 'permission': GattCharacteristic.PROPERTY_WRITE.value + 'permission': + GattCharacteristic.PROPERTY_WRITE.value }, { - 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", - 'property': GattCharacteristic.PROPERTY_NOTIFY.value | + 'uuid': + "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", + 'property': + GattCharacteristic.PROPERTY_NOTIFY.value | GattCharacteristic.PROPERTY_READ.value, - 'permission': GattCharacteristic.PERMISSION_READ.value + 'permission': + GattCharacteristic.PERMISSION_READ.value }, { - 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", - 'property': GattCharacteristic.PROPERTY_NOTIFY.value | + 'uuid': + "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", + 'property': + GattCharacteristic.PROPERTY_NOTIFY.value | GattCharacteristic.PROPERTY_READ.value, - 'permission': GattCharacteristic.PERMISSION_READ.value + 'permission': + GattCharacteristic.PERMISSION_READ.value }, ] - descriptor_input = [ - { - 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", - 'property': GattDescriptor.PERMISSION_READ.value | - GattDescriptor.PERMISSION_WRITE.value, - }, { - 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", - 'property': GattDescriptor.PERMISSION_READ.value | - GattCharacteristic.PERMISSION_WRITE.value, - } - ] + descriptor_input = [{ + 'uuid': + "aa7edd5a-4d1d-4f0e-883a-d145616a1630", + 'property': + GattDescriptor.PERMISSION_READ.value | + GattDescriptor.PERMISSION_WRITE.value, + }, { + 'uuid': + "76d5ed92-ca81-4edb-bb6b-9f019665fb32", + 'property': + GattDescriptor.PERMISSION_READ.value | + GattCharacteristic.PERMISSION_WRITE.value, + }] characteristic_list = setup_gatt_characteristics(droid, characteristic_input) descriptor_list = setup_gatt_descriptors(droid, descriptor_input) @@ -263,35 +276,46 @@ def setup_multiple_services(per_ad): def setup_characteristics_and_descriptors(droid): characteristic_input = [ { - 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", - 'property': GattCharacteristic.PROPERTY_WRITE.value | + 'uuid': + "aa7edd5a-4d1d-4f0e-883a-d145616a1630", + 'property': + GattCharacteristic.PROPERTY_WRITE.value | GattCharacteristic.PROPERTY_WRITE_NO_RESPONSE.value, - 'permission': GattCharacteristic.PROPERTY_WRITE.value + 'permission': + GattCharacteristic.PROPERTY_WRITE.value }, { - 'uuid': "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", - 'property': GattCharacteristic.PROPERTY_NOTIFY.value | + 'uuid': + "21c0a0bf-ad51-4a2d-8124-b74003e4e8c8", + 'property': + GattCharacteristic.PROPERTY_NOTIFY.value | GattCharacteristic.PROPERTY_READ.value, - 'permission': GattCharacteristic.PERMISSION_READ.value + 'permission': + GattCharacteristic.PERMISSION_READ.value }, { - 'uuid': "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", - 'property': GattCharacteristic.PROPERTY_NOTIFY.value | + 'uuid': + "6774191f-6ec3-4aa2-b8a8-cf830e41fda6", + 'property': + GattCharacteristic.PROPERTY_NOTIFY.value | GattCharacteristic.PROPERTY_READ.value, - 'permission': GattCharacteristic.PERMISSION_READ.value + 'permission': + GattCharacteristic.PERMISSION_READ.value }, ] - descriptor_input = [ - { - 'uuid': "aa7edd5a-4d1d-4f0e-883a-d145616a1630", - 'property': GattDescriptor.PERMISSION_READ.value | - GattDescriptor.PERMISSION_WRITE.value, - }, { - 'uuid': "76d5ed92-ca81-4edb-bb6b-9f019665fb32", - 'property': GattDescriptor.PERMISSION_READ.value | - GattCharacteristic.PERMISSION_WRITE.value, - } - ] + descriptor_input = [{ + 'uuid': + "aa7edd5a-4d1d-4f0e-883a-d145616a1630", + 'property': + GattDescriptor.PERMISSION_READ.value | + GattDescriptor.PERMISSION_WRITE.value, + }, { + 'uuid': + "76d5ed92-ca81-4edb-bb6b-9f019665fb32", + 'property': + GattDescriptor.PERMISSION_READ.value | + GattCharacteristic.PERMISSION_WRITE.value, + }] characteristic_list = setup_gatt_characteristics(droid, characteristic_input) descriptor_list = setup_gatt_descriptors(droid, descriptor_input) @@ -310,9 +334,9 @@ def setup_gatt_characteristics(droid, input): def setup_gatt_descriptors(droid, input): descriptor_list = [] for item in input: - index = droid.gattServerCreateBluetoothGattDescriptor(item['uuid'], - item['property'], - ) + index = droid.gattServerCreateBluetoothGattDescriptor( + item['uuid'], + item['property'], ) descriptor_list.append(index) log.info("setup descriptor list: {}".format(descriptor_list)) return descriptor_list @@ -340,8 +364,8 @@ def setup_gatt_mtu(cen_ad, bluetooth_gatt, gatt_callback, mtu): mtu_event = cen_ad.ed.pop_event(expected_event, default_timeout) mtu_size_found = mtu_event['data']['MTU'] if mtu_size_found != mtu: - log.error("MTU size found: {}, expected: {}".format(mtu_size_found, - mtu)) + log.error( + "MTU size found: {}, expected: {}".format(mtu_size_found, mtu)) return False except Empty: log.error(GattCbErr.MTU_CHANGED_ERR.value.format(expected_event)) @@ -349,7 +373,9 @@ def setup_gatt_mtu(cen_ad, bluetooth_gatt, gatt_callback, mtu): return True -def log_gatt_server_uuids(cen_ad, discovered_services_index): +def log_gatt_server_uuids(cen_ad, + discovered_services_index, + bluetooth_gatt=None): services_count = cen_ad.droid.gattClientGetDiscoveredServicesCount( discovered_services_index) for i in range(services_count): @@ -359,11 +385,23 @@ def log_gatt_server_uuids(cen_ad, discovered_services_index): characteristic_uuids = ( cen_ad.droid.gattClientGetDiscoveredCharacteristicUuids( discovered_services_index, i)) - for characteristic in characteristic_uuids: - log.info("Discovered characteristic uuid {}".format( - characteristic)) + for j in range(len(characteristic_uuids)): descriptor_uuids = ( - cen_ad.droid.gattClientGetDiscoveredDescriptorUuids( - discovered_services_index, i, characteristic)) - for descriptor in descriptor_uuids: - log.info("Discovered descriptor uuid {}".format(descriptor))
\ No newline at end of file + cen_ad.droid.gattClientGetDiscoveredDescriptorUuidsByIndex( + discovered_services_index, i, j)) + if bluetooth_gatt: + char_inst_id = cen_ad.droid.gattClientGetCharacteristicInstanceId( + bluetooth_gatt, discovered_services_index, i, j) + log.info("Discovered characteristic handle uuid: {} {}".format( + hex(char_inst_id), characteristic_uuids[j])) + for k in range(len(descriptor_uuids)): + desc_inst_id = cen_ad.droid.gattClientGetDescriptorInstanceId( + bluetooth_gatt, discovered_services_index, i, j, k) + log.info("Discovered descriptor handle uuid: {} {}".format( + hex(desc_inst_id), descriptor_uuids[k])) + else: + log.info("Discovered characteristic uuid: {}".format( + characteristic_uuids[j])) + for k in range(len(descriptor_uuids)): + log.info("Discovered descriptor uuid {}".format( + descriptor_uuids[k])) diff --git a/acts/tests/google/bt/pts/cmd_input.py b/acts/tests/google/bt/pts/cmd_input.py index 278c1087d4..6ce41e319e 100644 --- a/acts/tests/google/bt/pts/cmd_input.py +++ b/acts/tests/google/bt/pts/cmd_input.py @@ -17,6 +17,8 @@ Python script for wrappers to various libraries. """ +from gattc_lib import GattClientLib + import cmd """Various Global Strings""" CMD_LOG = "CMD {} result: {}" @@ -25,6 +27,7 @@ FAILURE = "CMD {} threw exception: {}" class CmdInput(cmd.Cmd): """Simple command processor for Bluetooth PTS Testing""" + gattc_lib = None def setup_vars(self, android_devices, mac_addr, log): self.pri_dut = android_devices[0] @@ -34,9 +37,278 @@ class CmdInput(cmd.Cmd): self.mac_addr = mac_addr self.log = log + # Initialize libraries + self.gattc_lib = GattClientLib(log, mac_addr, self.pri_dut) + def emptyline(self): pass def do_EOF(self, line): "End Script" return True + + """Begin GATT Client wrappers""" + + def do_gattc_connect_over_le(self, line): + """Perform GATT connection over LE""" + cmd = "Gatt connect over LE" + try: + autoconnect = False + if line: + autoconnect = bool(line) + self.gattc_lib.connect_over_le(autoconnect) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_connect_over_bredr(self, line): + """Perform GATT connection over BREDR""" + cmd = "Gatt connect over BR/EDR" + try: + self.gattc_lib.connect_over_bredr() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_disconnect(self, line): + """Perform GATT disconnect""" + cmd = "Gatt Disconnect" + try: + self.gattc_lib.disconnect() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_char_by_uuid(self, line): + """GATT client read Characteristic by UUID.""" + cmd = "GATT client read Characteristic by UUID." + try: + self.gattc_lib.read_char_by_uuid(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_request_mtu(self, line): + """Request MTU Change of input value""" + cmd = "Request MTU Value" + try: + self.gattc_lib.request_mtu(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_list_all_uuids(self, line): + """From the GATT Client, discover services and list all services, + chars and descriptors + """ + cmd = "Discovery Services and list all UUIDS" + try: + self.gattc_lib.list_all_uuids() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_discover_services(self, line): + """GATT Client discover services of GATT Server""" + cmd = "Discovery Services of GATT Server" + try: + self.gattc_lib.discover_services() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_refresh(self, line): + """Perform Gatt Client Refresh""" + cmd = "GATT Client Refresh" + try: + self.gattc_lib.refresh() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_char_by_instance_id(self, line): + """From the GATT Client, discover services and list all services, + chars and descriptors + """ + cmd = "GATT Client Read By Instance ID" + try: + self.gattc_lib.read_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_char_by_instance_id(self, line): + """GATT Client Write to Characteristic by instance ID""" + cmd = "GATT Client write to Characteristic by instance ID" + try: + self.gattc_lib.write_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_mod_write_char_by_instance_id(self, line): + """GATT Client Write to Char that doesn't have write permission""" + cmd = "GATT Client Write to Char that doesn't have write permission" + try: + self.gattc_lib.mod_write_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_invalid_char_by_instance_id(self, line): + """GATT Client Write to Char that doesn't exists""" + try: + self.gattc_lib.write_invalid_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_mod_read_char_by_instance_id(self, line): + """GATT Client Read Char that doesn't have write permission""" + cmd = "GATT Client Read Char that doesn't have write permission" + try: + self.gattc_lib.mod_read_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_invalid_char_by_instance_id(self, line): + """GATT Client Read Char that doesn't exists""" + cmd = "GATT Client Read Char that doesn't exists" + try: + self.gattc_lib.read_invalid_char_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_mod_write_desc_by_instance_id(self, line): + """GATT Client Write to Desc that doesn't have write permission""" + cmd = "GATT Client Write to Desc that doesn't have write permission" + try: + self.gattc_lib.mod_write_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_invalid_desc_by_instance_id(self, line): + """GATT Client Write to Desc that doesn't exists""" + cmd = "GATT Client Write to Desc that doesn't exists" + try: + self.gattc_lib.write_invalid_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_mod_read_desc_by_instance_id(self, line): + """GATT Client Read Desc that doesn't have write permission""" + cmd = "GATT Client Read Desc that doesn't have write permission" + try: + self.gattc_lib.mod_read_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_invalid_desc_by_instance_id(self, line): + """GATT Client Read Desc that doesn't exists""" + cmd = "GATT Client Read Desc that doesn't exists" + try: + self.gattc_lib.read_invalid_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_mod_read_char_by_uuid_and_instance_id(self, line): + """GATT Client Read Char that doesn't have write permission""" + cmd = "GATT Client Read Char that doesn't have write permission" + try: + self.gattc_lib.mod_read_char_by_uuid_and_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_invalid_char_by_uuid(self, line): + """GATT Client Read Char that doesn't exist""" + cmd = "GATT Client Read Char that doesn't exist" + try: + self.gattc_lib.read_invalid_char_by_uuid(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_desc_by_instance_id(self, line): + """GATT Client Write to Descriptor by instance ID""" + cmd = "GATT Client Write to Descriptor by instance ID" + try: + self.gattc_lib.write_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_enable_notification_desc_by_instance_id(self, line): + """GATT Client Enable Notification on Descriptor by instance ID""" + cmd = "GATT Client Enable Notification on Descriptor by instance ID" + try: + self.gattc_lib.enable_notification_desc_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_char_enable_all_notifications(self, line): + """GATT Client enable all notifications""" + cmd = "GATT Client enable all notifications" + try: + self.gattc_lib.char_enable_all_notifications() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_char_by_invalid_instance_id(self, line): + """GATT Client read char by non-existant instance id""" + cmd = "GATT Client read char by non-existant instance id" + try: + self.gattc_lib.read_char_by_invalid_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_begin_reliable_write(self, line): + """Begin a reliable write on the Bluetooth Gatt Client""" + cmd = "GATT Client Begin Reliable Write" + try: + self.gattc_lib.begin_reliable_write() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_abort_reliable_write(self, line): + """Abort a reliable write on the Bluetooth Gatt Client""" + cmd = "GATT Client Abort Reliable Write" + try: + self.gattc_lib.abort_reliable_write() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_execute_reliable_write(self, line): + """Execute a reliable write on the Bluetooth Gatt Client""" + cmd = "GATT Client Execute Reliable Write" + try: + self.gattc_lib.execute_reliable_write() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_read_all_char(self, line): + """GATT Client read all Characteristic values""" + cmd = "GATT Client read all Characteristic values" + try: + self.gattc_lib.read_all_char() + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_all_char(self, line): + """Write to every Characteristic on the GATT server""" + cmd = "GATT Client Write All Characteristics" + try: + self.gattc_lib.write_all_char(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_all_desc(self, line): + """ Write to every Descriptor on the GATT server """ + cmd = "GATT Client Write All Descriptors" + try: + self.gattc_lib.write_all_desc(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_write_desc_notification_by_instance_id(self, line): + """Write 0x00 or 0x02 to notification descriptor [instance_id]""" + cmd = "Write 0x00 0x02 to notification descriptor" + try: + self.gattc_lib.write_desc_notification_by_instance_id(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + def do_gattc_discover_service_by_uuid(self, line): + """Discover service by uuid""" + cmd = "Discover service by uuid" + try: + self.gattc_lib.discover_service_by_uuid(line) + except Exception as err: + self.log.info(FAILURE.format(cmd, err)) + + """End GATT Client wrappers""" diff --git a/acts/tests/google/bt/pts/gattc_lib.py b/acts/tests/google/bt/pts/gattc_lib.py new file mode 100644 index 0000000000..89718d6964 --- /dev/null +++ b/acts/tests/google/bt/pts/gattc_lib.py @@ -0,0 +1,448 @@ +#/usr/bin/env python3.4 +# +# Copyright (C) 2016 The Android Open Source Project +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. +""" +GATT Client Libraries +""" + +from acts.test_utils.bt.bt_gatt_utils import disconnect_gatt_connection +from acts.test_utils.bt.bt_gatt_utils import setup_gatt_connection +from acts.test_utils.bt.bt_gatt_utils import setup_gatt_mtu +from acts.test_utils.bt.GattEnum import GattCbStrings +from acts.test_utils.bt.GattEnum import GattDescriptor +from acts.test_utils.bt.GattEnum import GattTransport +from acts.test_utils.bt.bt_gatt_utils import log_gatt_server_uuids + +import time +import os + + +class GattClientLib(): + def __init__(self, log, mac_addr, dut): + self.dut = dut + self.log = log + self.mac_addr = mac_addr + self.gatt_callback = None + self.bluetooth_gatt = None + self.discovered_services_index = None + self.generic_uuid = "0000{}-0000-1000-8000-00805f9b34fb" + + def connect_over_le(self, autoconnect): + """Perform GATT connection over LE""" + self.bluetooth_gatt, self.gatt_callback = setup_gatt_connection( + self.dut, + self.mac_addr, + autoconnect, + transport=GattTransport.TRANSPORT_LE.value) + self.discovered_services_index = None + + def connect_over_bredr(self): + """Perform GATT connection over BREDR""" + self.bluetooth_gatt, self.gatt_callback = setup_gatt_connection( + self.dut, + self.mac_addr, + False, + transport=GattTransport.TRANSPORT_BREDR.value) + + def disconnect(self): + """Perform GATT disconnect""" + try: + disconnect_gatt_connection(self.dut, self.bluetooth_gatt, + self.gatt_callback) + except Exception as err: + self.log.info("Cmd {} failed with {}".format(cmd, err)) + try: + self.dut.droid.gattClientClose(self.bluetooth_gatt) + except Exception as err: + self.log.info("Cmd failed with {}".format(err)) + + def _setup_discovered_services_index(self): + if not self.discovered_services_index: + self.dut.droid.gattClientDiscoverServices(self.bluetooth_gatt) + expected_event = GattCbStrings.GATT_SERV_DISC.value.format( + self.gatt_callback) + event = self.dut.ed.pop_event(expected_event, 10) + self.discovered_services_index = event['data']['ServicesIndex'] + + def read_char_by_uuid(self, line): + """GATT client read Characteristic by UUID.""" + uuid = line + if len(line) == 4: + uuid = self.generic_uuid.format(line) + self.dut.droid.gattClientReadUsingCharacteristicUuid( + self.bluetooth_gatt, uuid, 0x0001, 0xFFFF) + + def request_mtu(self, mtu): + """Request MTU Change of input value""" + setup_gatt_mtu(self.dut, self.bluetooth_gatt, self.gatt_callback, + int(mtu)) + + def list_all_uuids(self): + """From the GATT Client, discover services and list all services, + chars and descriptors + """ + self._setup_discovered_services_index() + log_gatt_server_uuids(self.dut, self.discovered_services_index, + self.bluetooth_gatt) + + def discover_services(self): + """GATT Client discover services of GATT Server""" + self.dut.droid.gattClientDiscoverServices(self.bluetooth_gatt) + + def refresh(self): + """Perform Gatt Client Refresh""" + self.dut.droid.gattClientRefresh(self.bluetooth_gatt) + + def read_char_by_instance_id(self, id): + """From the GATT Client, discover services and list all services, + chars and descriptors + """ + if not id: + self.log.info("Invalid id") + return + self._setup_discovered_services_index() + self.dut.droid.gattClientReadCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, int(id, 16)) + + def write_char_by_instance_id(self, line): + """GATT Client Write to Characteristic by instance ID""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [InstanceId] [Size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientWriteCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def mod_write_char_by_instance_id(self, line): + """GATT Client Write to Char that doesn't have write permission""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [InstanceId] [Size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientModifyAccessAndWriteCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def write_invalid_char_by_instance_id(self, line): + """GATT Client Write to Char that doesn't exists""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [InstanceId] [Size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientWriteInvalidCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def mod_read_char_by_instance_id(self, line): + """GATT Client Read Char that doesn't have write permission""" + instance_id = line + self._setup_discovered_services_index() + self.dut.droid.gattClientModifyAccessAndReadCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16)) + + def read_invalid_char_by_instance_id(self, line): + """GATT Client Read Char that doesn't exists""" + instance_id = line + self.dut.droid.gattClientReadInvalidCharacteristicByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16)) + + def mod_write_desc_by_instance_id(self, line): + """GATT Client Write to Desc that doesn't have write permission""" + cmd = "" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [InstanceId] [Size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientModifyAccessAndWriteDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def write_invalid_desc_by_instance_id(self, line): + """GATT Client Write to Desc that doesn't exists""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [InstanceId] [Size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientWriteInvalidDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def mod_read_desc_by_instance_id(self, line): + """GATT Client Read Desc that doesn't have write permission""" + cmd = "" + instance_id = line + self._setup_discovered_services_index() + self.dut.droid.gattClientModifyAccessAndReadDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16)) + + def read_invalid_desc_by_instance_id(self, line): + """GATT Client Read Desc that doesn't exists""" + instance_id = line + self._setup_discovered_services_index() + self.dut.droid.gattClientReadInvalidDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16)) + + def mod_read_char_by_uuid_and_instance_id(self, line): + """GATT Client Read Char that doesn't have write permission""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [uuid] [instance_id]") + return + uuid = args[0] + instance_id = args[1] + self._setup_discovered_services_index() + self.dut.droid.gattClientModifyAccessAndReadCharacteristicByUuidAndInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), self.generic_uuid.format(uuid)) + + def read_invalid_char_by_uuid(self, line): + """GATT Client Read Char that doesn't exists""" + uuid = line + self._setup_discovered_services_index() + self.dut.droid.gattClientReadInvalidCharacteristicByUuid( + self.bluetooth_gatt, self.discovered_services_index, + self.generic_uuid.format(uuid)) + + def write_desc_by_instance_id(self, line): + """GATT Client Write to Descriptor by instance ID""" + args = line.split() + if len(args) != 2: + self.log.info("2 Arguments required: [instanceID] [size]") + return + instance_id = args[0] + size = args[1] + write_value = [] + for i in range(int(size)): + write_value.append(i % 256) + self._setup_discovered_services_index() + self.dut.droid.gattClientWriteDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def write_desc_notification_by_instance_id(self, line): + """GATT Client Write to Descriptor by instance ID""" + args = line.split() + instance_id = args[0] + switch = int(args[1]) + write_value = [0x00, 0x00] + if switch == 2: + write_value = [0x02, 0x00] + self._setup_discovered_services_index() + self.dut.droid.gattClientWriteDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), write_value) + + def enable_notification_desc_by_instance_id(self, line): + """GATT Client Enable Notification on Descriptor by instance ID""" + instance_id = line + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + """ + self.log.info( + CMD_LOG.format( + cmd, + self.dut.droid.gattClientWriteDescriptorByInstanceId( + self.bluetooth_gatt, self.discovered_services_index, + int(instance_id, 16), + GattDescriptor.ENABLE_NOTIFICATION_VALUE.value))) + """ + for i in range(services_count): + characteristic_uuids = ( + self.dut.droid.gattClientGetDiscoveredCharacteristicUuids( + self.discovered_services_index, i)) + for j in range(len(characteristic_uuids)): + descriptor_uuids = ( + self.dut.droid. + gattClientGetDiscoveredDescriptorUuidsByIndex( + self.discovered_services_index, i, j)) + for k in range(len(descriptor_uuids)): + desc_inst_id = self.dut.droid.gattClientGetDescriptorInstanceId( + self.bluetooth_gatt, self.discovered_services_index, i, + j, k) + if desc_inst_id == int(instance_id, 16): + self.dut.droid.gattClientDescriptorSetValueByIndex( + self.bluetooth_gatt, + self.discovered_services_index, i, j, k, + GattDescriptor.ENABLE_NOTIFICATION_VALUE.value) + time.sleep(2) #Necessary for PTS + self.dut.droid.gattClientWriteDescriptorByIndex( + self.bluetooth_gatt, + self.discovered_services_index, i, j, k) + time.sleep(2) #Necessary for PTS + self.dut.droid.gattClientSetCharacteristicNotificationByIndex( + self.bluetooth_gatt, + self.discovered_services_index, i, j, True) + + def char_enable_all_notifications(self): + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + for i in range(services_count): + characteristic_uuids = ( + self.dut.droid.gattClientGetDiscoveredCharacteristicUuids( + self.discovered_services_index, i)) + for j in range(len(characteristic_uuids)): + self.dut.droid.gattClientSetCharacteristicNotificationByIndex( + self.bluetooth_gatt, self.discovered_services_index, i, j, + True) + + def read_char_by_invalid_instance_id(self, line): + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + self.dut.droid.gattClientReadInvalidCharacteristicInstanceId( + self.bluetooth_gatt, self.discovered_services_index, 0, + int(line, 16)) + + def begin_reliable_write(self): + """Begin a reliable write on the Bluetooth Gatt Client""" + self.dut.droid.gattClientBeginReliableWrite(self.bluetooth_gatt) + + def abort_reliable_write(self): + """Abort a reliable write on the Bluetooth Gatt Client""" + self.dut.droid.gattClientAbortReliableWrite(self.bluetooth_gatt) + + def execute_reliable_write(self): + """Execute a reliable write on the Bluetooth Gatt Client""" + self.dut.droid.gattExecuteReliableWrite(self.bluetooth_gatt) + + def read_all_char(self): + """GATT Client read all Characteristic values""" + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + for i in range(services_count): + characteristic_uuids = ( + self.dut.droid.gattClientGetDiscoveredCharacteristicUuids( + self.discovered_services_index, i)) + for j in range(len(characteristic_uuids)): + char_inst_id = self.dut.droid.gattClientGetCharacteristicInstanceId( + self.bluetooth_gatt, self.discovered_services_index, i, j) + self.log.info("Reading characteristic {} {}".format( + hex(char_inst_id), characteristic_uuids[j])) + self.dut.droid.gattClientReadCharacteristicByIndex( + self.bluetooth_gatt, self.discovered_services_index, i, j) + time.sleep(1) # Necessary for PTS + + def write_all_char(self, line): + """Write to every Characteristic on the GATT server""" + args = line.split() + write_value = [] + for i in range(int(line)): + write_value.append(i % 256) + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + for i in range(services_count): + characteristic_uuids = ( + self.dut.droid.gattClientGetDiscoveredCharacteristicUuids( + self.discovered_services_index, i)) + for j in range(len(characteristic_uuids)): + time.sleep(1) + char_inst_id = self.dut.droid.gattClientGetCharacteristicInstanceId( + self.bluetooth_gatt, self.discovered_services_index, i, j) + self.log.info("Writing to {} {}".format( + hex(char_inst_id), characteristic_uuids[j])) + try: + self.dut.droid.gattClientCharacteristicSetValueByIndex( + self.bluetooth_gatt, self.discovered_services_index, i, + j, write_value) + self.dut.droid.gattClientWriteCharacteristicByIndex( + self.bluetooth_gatt, self.discovered_services_index, i, + j) + except Exception as err: + self.log.info("Failed to write to characteristic: {}". + format(characteristic_uuids[j])) + + def write_all_desc(self, line): + """ Write to every Descriptor on the GATT server """ + args = line.split() + write_value = [] + for i in range(int(line)): + write_value.append(i % 256) + self._setup_discovered_services_index() + services_count = self.dut.droid.gattClientGetDiscoveredServicesCount( + self.discovered_services_index) + for i in range(services_count): + characteristic_uuids = ( + self.dut.droid.gattClientGetDiscoveredCharacteristicUuids( + self.discovered_services_index, i)) + for j in range(len(characteristic_uuids)): + descriptor_uuids = ( + self.dut.droid. + gattClientGetDiscoveredDescriptorUuidsByIndex( + self.discovered_services_index, i, j)) + for k in range(len(descriptor_uuids)): + time.sleep(1) + desc_inst_id = self.dut.droid.gattClientGetDescriptorInstanceId( + self.bluetooth_gatt, self.discovered_services_index, i, + j, k) + self.log.info("Writing to {} {}".format( + hex(desc_inst_id), descriptor_uuids[k])) + try: + self.dut.droid.gattClientDescriptorSetValueByIndex( + self.bluetooth_gatt, + self.discovered_services_index, i, j, k, + write_value) + self.dut.droid.gattClientWriteDescriptorByIndex( + self.bluetooth_gatt, + self.discovered_services_index, i, j, k) + except Exception as err: + self.log.info("Failed to write to descriptor: {}". + format(descriptor_uuids[k])) + + def discover_service_by_uuid(self, line): + """ Discover service by UUID """ + uuid = line + if len(line) == 4: + uuid = self.generic_uuid.format(line) + self.dut.droid.gattClientDiscoverServiceByUuid(self.bluetooth_gatt, + uuid)
\ No newline at end of file |