diff options
author | android-build-team Robot <android-build-team-robot@google.com> | 2018-08-10 03:10:48 +0000 |
---|---|---|
committer | android-build-team Robot <android-build-team-robot@google.com> | 2018-08-10 03:10:48 +0000 |
commit | 152700a24286b8e098760726ff8aad8356e51758 (patch) | |
tree | 1ab6f1c060d261b55dd1b25d77bd9ea37d54df92 | |
parent | 0fd37483a2dde4803cdba97afb1bb5dd273102cc (diff) | |
parent | 7b00a3a220e4a4c257216503812a3ebd92b4b582 (diff) | |
download | platform_tools_test_connectivity-pie-qpr1-s3-release.tar.gz platform_tools_test_connectivity-pie-qpr1-s3-release.tar.bz2 platform_tools_test_connectivity-pie-qpr1-s3-release.zip |
Snap for 4945494 from 7b00a3a220e4a4c257216503812a3ebd92b4b582 to pi-qpr1-releaseandroid-9.0.0_r30android-9.0.0_r22android-9.0.0_r21android-9.0.0_r20android-9.0.0_r19android-9.0.0_r16pie-qpr1-s3-releasepie-qpr1-s2-releasepie-qpr1-s1-releasepie-qpr1-release
Change-Id: I8db466ac2800311fd3d07a3535155f22f8dc7e05
-rwxr-xr-x | acts/framework/tests/acts_android_device_test.py | 5 | ||||
-rw-r--r-- | acts/tests/google/net/DhcpServerTest.py | 794 |
2 files changed, 699 insertions, 100 deletions
diff --git a/acts/framework/tests/acts_android_device_test.py b/acts/framework/tests/acts_android_device_test.py index b2cb0b5fae..6c664f5e10 100755 --- a/acts/framework/tests/acts_android_device_test.py +++ b/acts/framework/tests/acts_android_device_test.py @@ -28,9 +28,10 @@ from acts.controllers import android_device MOCK_LOG_PATH = "/tmp/logs/MockTest/xx-xx-xx_xx-xx-xx/" # Mock start and end time of the adb cat. -MOCK_ADB_LOGCAT_BEGIN_TIME = "1970-01-02 21:03:20.123" -MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_ADB_EPOCH_BEGIN_TIME = 191000123 +MOCK_ADB_LOGCAT_BEGIN_TIME = logger.normalize_log_line_timestamp( + logger.epoch_to_log_line_timestamp(MOCK_ADB_EPOCH_BEGIN_TIME)) +MOCK_ADB_LOGCAT_END_TIME = "1970-01-02 21:22:02.000" MOCK_SERIAL = 1 MOCK_RELEASE_BUILD_ID = "ABC1.123456.007" diff --git a/acts/tests/google/net/DhcpServerTest.py b/acts/tests/google/net/DhcpServerTest.py index 4cc2b91da3..1f33d63422 100644 --- a/acts/tests/google/net/DhcpServerTest.py +++ b/acts/tests/google/net/DhcpServerTest.py @@ -6,12 +6,15 @@ from acts.test_decorators import test_tracker_info from scapy.all import * from threading import Event from threading import Thread +import random import time import warnings +CLIENT_PORT = 68 SERVER_PORT = 67 BROADCAST_MAC = 'ff:ff:ff:ff:ff:ff' +INET4_ANY = '0.0.0.0' NETADDR_PREFIX = '192.168.42.' OTHER_NETADDR_PREFIX = '192.168.43.' NETADDR_BROADCAST = '255.255.255.255' @@ -23,6 +26,7 @@ REQUEST = 3 ACK = 5 NAK = 6 + pmc_base_cmd = ( "am broadcast -a com.android.pmc.action.AUTOPOWER --es PowerAction ") start_pmc_cmd = ( @@ -41,7 +45,7 @@ class DhcpServerTest(base_test.BaseTestClass): conf.checkIPaddr = 0 conf.checkIPsrc = 0 # Allow using non-67 server ports as long as client uses 68 - bind_layers(UDP, BOOTP, dport=68) + bind_layers(UDP, BOOTP, dport=CLIENT_PORT) self.dut.adb.shell(start_pmc_cmd) self.dut.adb.shell("setprop log.tag.PMC VERBOSE") @@ -54,12 +58,13 @@ class DhcpServerTest(base_test.BaseTestClass): thread = Thread(target=self._sniff_arp, args=(self.stop_arp,)) thread.start() - # Discover server IP + # Discover server IP and device hwaddr hwaddr = self._next_hwaddr() - resp = self._get_response(make_discover(hwaddr)) + resp = self._get_response(self._make_discover(hwaddr)) asserts.assert_false(None == resp, "Device did not reply to first DHCP discover") self.server_addr = getopt(resp, 'server_id') + self.dut_hwaddr = resp.getlayer(Ether).src asserts.assert_false(None == self.server_addr, "DHCP server did not specify server identifier") # Ensure that we don't depend on assigned route/gateway on the host @@ -94,6 +99,7 @@ class DhcpServerTest(base_test.BaseTestClass): self.log.info("Starting USB Tethering") dut.stop_services() dut.adb.shell(pmc_start_usb_tethering_cmd) + self._wait_for_device(self.dut) self.USB_TETHERED = True def _stop_usb_tethering(self, dut): @@ -151,88 +157,301 @@ class DhcpServerTest(base_test.BaseTestClass): @test_tracker_info(uuid="a8712769-977a-4ee1-902f-90b3ba30b40c") def test_config_assumptions(self): - resp = self._get_response(make_discover(self.hwaddr)) + resp = self._get_response(self._make_discover(self.hwaddr)) asserts.assert_false(None == resp, "Device did not reply to discover") asserts.assert_true(get_yiaddr(resp).startswith(NETADDR_PREFIX), "Server does not use expected prefix") + @test_tracker_info(uuid="e3761689-7d64-46b1-97ce-15f315eaf568") + def test_discover_broadcastbit(self): + resp = self._get_response( + self._make_discover(self.hwaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="30a7ea7c-c20f-4c46-aaf2-96f19d8f8191") + def test_discover_bootpfields(self): + discover = self._make_discover(self.hwaddr) + resp = self._get_response(discover) + self._assert_offer(resp) + self._assert_unicast(resp) + bootp = assert_bootp_response(resp, discover) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) + + @test_tracker_info(uuid="593f4051-516d-44fa-8834-7d485362f182") + def test_discover_relayed_broadcastbit(self): + giaddr = NETADDR_PREFIX + '123' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr, bcastbit=True)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp) + + def _run_discover_paramrequestlist(self, params, unwanted_params): + params_opt = make_paramrequestlist_opt(params) + resp = self._get_response( + self._make_discover(self.hwaddr, options=[params_opt])) + + self._assert_offer(resp) + # List of requested params in response order + resp_opts = get_opt_labels(resp) + resp_requested_opts = [opt for opt in resp_opts if opt in params] + # All above params should be supported, order should be conserved + asserts.assert_equal(params, resp_requested_opts) + asserts.assert_equal(0, len(set(resp_opts) & set(unwanted_params))) + return resp + + @test_tracker_info(uuid="00a8a3f6-f143-47ff-a79b-482c607fb5b8") + def test_discover_paramrequestlist(self): + resp = self._run_discover_paramrequestlist( + ['subnet_mask', 'broadcast_address', 'router', 'name_server'], + unwanted_params=[]) + for opt in ['broadcast_address', 'router', 'name_server']: + asserts.assert_true(getopt(resp, opt).startswith(NETADDR_PREFIX), + opt + ' does not start with ' + NETADDR_PREFIX) + + subnet_mask = getopt(resp, 'subnet_mask') + asserts.assert_true(subnet_mask.startswith('255.255.'), + 'Unexpected subnet mask for /16+: ' + subnet_mask) + + @test_tracker_info(uuid="d1aad4a3-9eab-4900-aa6a-5b82a4a64f46") + def test_discover_paramrequestlist_rev(self): + # RFC2132 #9.8: "The DHCP server is not required to return the options + # in the requested order, but MUST try to insert the requested options + # in the order requested" + asserts.skip('legacy behavior not compliant: fixed order used') + self._run_discover_paramrequestlist( + ['name_server', 'router', 'broadcast_address', 'subnet_mask'], + unwanted_params=[]) + + @test_tracker_info(uuid="e3ae6335-8cc7-4bf1-bb58-67646b727f2b") + def test_discover_paramrequestlist_unwanted(self): + asserts.skip('legacy behavior always sends all parameters') + self._run_discover_paramrequestlist(['router', 'name_server'], + unwanted_params=['broadcast_address', 'subnet_mask']) + + def _assert_renews(self, request, addr, exp_time, resp_type=ACK): + # Sleep to test lease time renewal + time.sleep(3) + resp = self._get_response(request) + self._assert_type(resp, resp_type) + asserts.assert_equal(addr, get_yiaddr(resp)) + remaining_lease = getopt(resp, 'lease_time') + # Lease renewed: waited for 3s, lease time not decreased by more than 2 + asserts.assert_true(remaining_lease >= exp_time - 2, + 'Lease not renewed') + # Lease times should be consistent across offers/renewals + asserts.assert_true(remaining_lease <= exp_time + 2, + 'Lease time inconsistent') + return resp + @test_tracker_info(uuid="d6b598b7-f443-4b5a-ba80-4af5d211cade") def test_discover_assigned_ownaddress(self): addr, siaddr, resp = self._request_address(self.hwaddr) lease_time = getopt(resp, 'lease_time') server_id = getopt(resp, 'server_id') - asserts.assert_true(lease_time > 10, "Lease time is unreasonably short") - asserts.assert_false(addr == '0.0.0.0', "Assigned address is empty") + asserts.assert_true(lease_time >= 60, "Lease time is too short") + asserts.assert_false(addr == INET4_ANY, "Assigned address is empty") # Wait to test lease expiration time change - time.sleep(2) + time.sleep(3) # New discover, same address - resp = self._get_response(make_discover(self.hwaddr)) - # Lease time renewed: exptime not decreased - asserts.assert_equal(lease_time, getopt(resp, 'lease_time')) - asserts.assert_equal(addr, get_yiaddr(resp)) + resp = self._assert_renews(self._make_discover(self.hwaddr), + addr, lease_time, resp_type=OFFER) + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="cbb07d77-912b-4269-bbbc-adba99779587") def test_discover_assigned_otherhost(self): addr, siaddr, _ = self._request_address(self.hwaddr) # New discover, same address, different client - resp = self._get_response(make_discover(self.other_hwaddr, + resp = self._get_response(self._make_discover(self.other_hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_false(get_yiaddr(resp) == addr, "Already assigned address offered") + self._assert_unicast(resp, get_yiaddr(resp)) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="3d2b3d2f-eb5f-498f-b887-3b4638cebf14") def test_discover_requestaddress(self): addr = NETADDR_PREFIX + '200' - resp = self._get_response(make_discover(self.hwaddr, + resp = self._get_response(self._make_discover(self.hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_equal(get_yiaddr(resp), addr) # Lease not committed: can request again - resp = self._get_response(make_discover(self.other_hwaddr, + resp = self._get_response(self._make_discover(self.other_hwaddr, [('requested_addr', addr)])) self._assert_offer(resp) asserts.assert_equal(get_yiaddr(resp), addr) - def _assert_renews(self, request, addr, expTime): - time.sleep(2) - resp = self._get_response(request) - self._assert_ack(resp) - asserts.assert_equal(addr, get_yiaddr(resp)) - # Lease time renewed - asserts.assert_equal(expTime, getopt(resp, 'lease_time')) + @test_tracker_info(uuid="5ffd9d25-304e-434b-bedb-56ccf27dcebd") + def test_discover_requestaddress_wrongsubnet(self): + addr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, [('requested_addr', addr)])) + self._assert_offer(resp) + self._assert_unicast(resp) + asserts.assert_false(get_yiaddr(resp) == addr, + 'Server offered invalid address') - @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") - def test_request_wrongnet(self): - resp = self._get_response(make_request(self.hwaddr, - OTHER_NETADDR_PREFIX + '1', None)) - self._assert_nak(resp) + @test_tracker_info(uuid="f7d6a92f-9386-4b65-b6c1-d0a3f11213bf") + def test_discover_giaddr_outside_subnet(self): + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="1348c79a-9203-4bb8-b33b-af80bacd17b1") + def test_discover_srcaddr_outside_subnet(self): + srcaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, ip_src=srcaddr)) + self._assert_offer(resp) + asserts.assert_false(srcaddr == get_yiaddr(resp), + 'Server offered invalid address') + + @test_tracker_info(uuid="a03bb783-8665-4c66-9c0c-1bb02ddca07e") + def test_discover_requestaddress_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + req = self._make_discover(self.hwaddr, [('requested_addr', addr)], + ip_src=giaddr, giaddr=giaddr) + resp = self._get_response(req) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="725956af-71e2-45d8-b8b3-402d21bfc7db") + def test_discover_knownaddress_giaddr_outside_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in invalid subnet + giaddr = OTHER_NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + asserts.assert_equal(resp, None) + + @test_tracker_info(uuid="2ee9d5b1-c15d-40c4-98e9-63202d1f1557") + def test_discover_knownaddress_giaddr_valid_subnet(self): + addr, siaddr, _ = self._request_address(self.hwaddr) + + # New discover, same client, through relay in valid subnet + giaddr = NETADDR_PREFIX + '200' + resp = self._get_response( + self._make_discover(self.hwaddr, giaddr=giaddr)) + self._assert_offer(resp) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f43105a5-633a-417a-8a07-39bc36c493e7") + def test_request_unicast(self): + addr, siaddr, resp = self._request_address(self.hwaddr, bcast=False) + self._assert_unicast(resp, addr) + + @test_tracker_info(uuid="09f3c1c4-1202-4f85-a965-4d86aee069f3") + def test_request_bootpfields(self): + req_addr = NETADDR_PREFIX + '200' + req = self._make_request(self.hwaddr, req_addr, self.server_addr) + resp = self._get_response(req) + self._assert_ack(resp) + bootp = assert_bootp_response(resp, req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(self.server_addr, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.hwaddr, get_chaddr(bootp)) @test_tracker_info(uuid="ec00d268-80cb-4be5-9771-2292cc7d2e18") - def test_request_inuse(self): + def test_request_selecting_inuse(self): addr, siaddr, _ = self._request_address(self.hwaddr) - res = self._get_response(make_request(self.other_hwaddr, addr, None)) - self._assert_nak(res) + new_req = self._make_request(self.other_hwaddr, addr, siaddr) + resp = self._get_response(new_req) + self._assert_nak(resp) + self._assert_broadcast(resp) + bootp = assert_bootp_response(resp, new_req) + asserts.assert_equal(INET4_ANY, bootp.ciaddr) + asserts.assert_equal(INET4_ANY, bootp.yiaddr) + asserts.assert_equal(INET4_ANY, bootp.siaddr) + asserts.assert_equal(INET4_ANY, bootp.giaddr) + asserts.assert_equal(self.other_hwaddr, get_chaddr(bootp)) + asserts.assert_equal( + ['message-type', 'server_id', 56, 'end'], # 56 is "message" opt + get_opt_labels(bootp)) + asserts.assert_equal(self.server_addr, getopt(bootp, 'server_id')) + + @test_tracker_info(uuid="0643c179-3542-4297-9b06-8d86ff785e9c") + def test_request_selecting_wrongsiaddr(self): + addr = NETADDR_PREFIX + '200' + wrong_siaddr = NETADDR_PREFIX + '201' + asserts.assert_false(wrong_siaddr == self.server_addr, + 'Test assumption not met: server addr is ' + wrong_siaddr) + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=wrong_siaddr)) + asserts.assert_true(resp == None, + 'Received response for request with incorrect siaddr') + + @test_tracker_info(uuid="676beab2-4af8-4bf0-a4ad-c7626ae5987f") + def test_request_selecting_giaddr_outside_subnet(self): + addr = NETADDR_PREFIX + '200' + giaddr = OTHER_NETADDR_PREFIX + '201' + resp = self._get_response( + self._make_request(self.hwaddr, addr, siaddr=self.server_addr, + giaddr=giaddr)) + asserts.assert_equal(resp, None) - @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") - def test_request_initreboot(self): + @test_tracker_info(uuid="fe17df0c-2f41-416f-bb76-d75b74b63c0f") + def test_request_selecting_hostnameupdate(self): + addr = NETADDR_PREFIX + '123' + hostname1 = b'testhostname1' + hostname2 = b'testhostname2' + req = self._make_request(self.hwaddr, None, None, + options=[ + ('requested_addr', addr), + ('server_id', self.server_addr), + ('hostname', hostname1)]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname1, getopt(req, 'hostname')) + + # Re-request with different hostname + setopt(req, 'hostname', hostname2) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_unicast(resp, addr) + asserts.assert_equal(hostname2, getopt(req, 'hostname')) + + def _run_initreboot(self, bcastbit): addr, siaddr, resp = self._request_address(self.hwaddr) exp = getopt(resp, 'lease_time') - # siaddr NONE: init-reboot client state - self._assert_renews(make_request(self.hwaddr, addr, None), addr, exp) + # init-reboot: siaddr is None + return self._assert_renews(self._make_request( + self.hwaddr, addr, siaddr=None, bcastbit=bcastbit), addr, exp) + + @test_tracker_info(uuid="263c91b9-cfe9-4f21-985d-b7046df80528") + def test_request_initreboot(self): + resp = self._run_initreboot(bcastbit=False) + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="f05dd60f-03dd-4e2b-8e58-80f4d752ad51") + def test_request_initreboot_broadcastbit(self): + resp = self._run_initreboot(bcastbit=True) + self._assert_broadcast(resp) @test_tracker_info(uuid="5563c616-2136-47f6-9151-4e28cbfe797c") def test_request_initreboot_nolease(self): # RFC2131 #4.3.2 - asserts.skip("dnsmasq not compliant if --dhcp-authoritative set.") + asserts.skip("legacy behavior not compliant") addr = NETADDR_PREFIX + '123' - resp = self._get_response(make_request(self.hwaddr, addr, None)) + resp = self._get_response(self._make_request(self.hwaddr, addr, None)) asserts.assert_equal(resp, None) @test_tracker_info(uuid="da5c5537-cb38-4a2e-828f-44bc97976fe5") @@ -242,24 +461,51 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == otheraddr, "Test assumption not met: server assigned " + otheraddr) - resp = self._get_response(make_request(self.hwaddr, otheraddr, None)) + resp = self._get_response( + self._make_request(self.hwaddr, otheraddr, siaddr=None)) + self._assert_nak(resp) + self._assert_broadcast(resp) + + @test_tracker_info(uuid="ce42ba57-07be-427b-9cbd-5535c62b0120") + def test_request_initreboot_wrongnet(self): + resp = self._get_response(self._make_request(self.hwaddr, + OTHER_NETADDR_PREFIX + '1', siaddr=None)) self._assert_nak(resp) + self._assert_broadcast(resp) - @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") - def test_request_rebinding(self): + def _run_rebinding(self, bcastbit, giaddr=INET4_ANY): addr, siaddr, resp = self._request_address(self.hwaddr) exp = getopt(resp, 'lease_time') - self._assert_renews(make_request(self.hwaddr, None, None, ciaddr=addr), + # Rebinding: no siaddr or reqaddr + resp = self._assert_renews( + self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=addr, giaddr=giaddr, ip_src=addr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit), addr, exp) + return resp, addr + + @test_tracker_info(uuid="68bfcb25-5873-41ad-ad0a-bf22781534ca") + def test_request_rebinding(self): + resp, addr = self._run_rebinding(bcastbit=False) + self._assert_unicast(resp, addr) + self._assert_broadcastbit(resp, isset=False) + + @test_tracker_info(uuid="4c591536-8062-40ec-ae12-1ebe7dcad8e2") + def test_request_rebinding_relayed(self): + giaddr = NETADDR_PREFIX + '123' + resp, _ = self._run_rebinding(bcastbit=False, giaddr=giaddr) + self._assert_unicast(resp, giaddr) + self._assert_broadcastbit(resp, isset=False) @test_tracker_info(uuid="cee2668b-bd79-47d7-b358-8f9387d715b1") def test_request_rebinding_inuse(self): addr, siaddr, _ = self._request_address(self.hwaddr) - resp = self._get_response(make_request(self.other_hwaddr, None, None, - ciaddr=addr)) + resp = self._get_response(self._make_request( + self.other_hwaddr, reqaddr=None, siaddr=None, ciaddr=addr)) self._assert_nak(resp) + self._assert_broadcast(resp) @test_tracker_info(uuid="d95d69b5-ab9a-42f5-8dd0-b9b6a6d960cc") def test_request_rebinding_wrongaddr(self): @@ -268,8 +514,8 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == otheraddr, "Test assumption not met: server assigned " + otheraddr) - resp = self._get_response(make_request(self.hwaddr, None, None, - ciaddr=otheraddr)) + resp = self._get_response(self._make_request( + self.hwaddr, reqaddr=None, siaddr=siaddr, ciaddr=otheraddr)) self._assert_nak(resp) self._assert_broadcast(resp) @@ -283,58 +529,352 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(addr == relayaddr, "Test assumption not met: server assigned " + relayaddr) - req = make_request(self.hwaddr, None, None, ciaddr=otheraddr) - req.getlayer(BOOTP).giaddr = relayaddr + req = self._make_request(self.hwaddr, reqaddr=None, siaddr=None, + ciaddr=otheraddr, giaddr=relayaddr) resp = self._get_response(req) self._assert_nak(resp) self._assert_unicast(resp, relayaddr) + self._assert_broadcastbit(resp) @test_tracker_info(uuid="6ff1fab4-009a-4758-9153-0d9db63423da") def test_release(self): addr, siaddr, _ = self._request_address(self.hwaddr) # Re-requesting fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) # Succeeds after release - self._send(make_release(self.hwaddr, addr, siaddr)) - time.sleep(1) - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + self._send(self._make_release(self.hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_ack(resp) @test_tracker_info(uuid="abb1a53e-6b6c-468f-88b9-ace9ca4d6593") def test_release_noserverid(self): addr, siaddr, _ = self._request_address(self.hwaddr) - # Release without server_id opt ignored - release = make_release(self.hwaddr, addr, siaddr) + # Release without server_id opt is ignored + release = self._make_release(self.hwaddr, addr, siaddr) removeopt(release, 'server_id') self._send(release) # Not released: request fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) @test_tracker_info(uuid="8415b69e-ae61-4474-8495-d783ba6818d1") def test_release_wrongserverid(self): addr, siaddr, _ = self._request_address(self.hwaddr) # Release with wrong server id - release = make_release(self.hwaddr, addr, siaddr) + release = self._make_release(self.hwaddr, addr, siaddr) setopt(release, 'server_id', addr) self._send(release) # Not released: request fails - resp = self._get_response(make_request(self.other_hwaddr, addr, siaddr)) + resp = self._get_response( + self._make_request(self.other_hwaddr, addr, siaddr)) self._assert_nak(resp) + self._assert_broadcast(resp) - def _request_address(self, hwaddr): - resp = self._get_response(make_discover(self.hwaddr)) + @test_tracker_info(uuid="0858f678-3db2-4c12-a21b-6e16c5d7e7ce") + def test_unicast_l2l3(self): + reqAddr = NETADDR_PREFIX + '124' + resp = self._get_response(self._make_request( + self.hwaddr, reqAddr, siaddr=None)) + self._assert_unicast(resp) + str_hwaddr = format_hwaddr(self.hwaddr) + asserts.assert_equal(str_hwaddr, resp.getlayer(Ether).dst) + asserts.assert_equal(reqAddr, resp.getlayer(IP).dst) + asserts.assert_equal(CLIENT_PORT, resp.getlayer(UDP).dport) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + params_opt = make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 'NetBIOS_server', + 46, # NetBIOS over TCP/IP Node Type + ]) + req = self._make_discover(self.hwaddr, + options=[ + params_opt, + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_standard_offer(resp) + + def _make_macos_10_13_3_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 121, # Classless Static Route + 'router', + 'name_server', + 'domain', + 119, # Domain Search + 252, # Private/Proxy autodiscovery + 95, # LDAP + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + ]) + + @test_tracker_info(uuid="bf05efe9-ee5b-46ba-9b3c-5a4441c13798") + def test_macos_10_13_3_discover(self): + req = self._make_discover(self.hwaddr, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + + @test_tracker_info(uuid="7acc796b-c4f1-46cc-8ffb-0a0efb05ae86") + def test_macos_10_13_3_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test12-macbookpro'), + ]) + req.getlayer(BOOTP).secs = 5 + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp) + + # Note: macOS does not seem to do any rebinding (straight to discover) + @test_tracker_info(uuid="e8f0b60c-9ea3-4184-8426-151a395bff5b") + def test_macos_10_13_3_request_renewing(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, ip_dst=self.server_addr, options=[ + self._make_macos_10_13_3_paramrequestlist(), + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('lease_time', 7776000), + ('hostname', b'test12-macbookpro'), + ], opts_padding=bytes(6)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True) + + def _make_win10_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'router', + 'name_server', + 'domain', + 31, # Perform Router Discover + 33, # Static Route + 'vendor_specific', + 44, # NetBIOS over TCP/IP Name Server + 46, # NetBIOS over TCP/IP Node Type + 47, # NetBIOS over TCP/IP Scope + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 252, # Private/Proxy autodiscovery + ]) + + @test_tracker_info(uuid="11b3db9c-4cd7-4088-99dc-881f25ce4e76") + def test_win10_discover(self): + req = self._make_discover(self.hwaddr, bcastbit=True, + options=[ + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ], opts_padding=bytes(11)) + req.getlayer(BOOTP).secs = 2 + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + @test_tracker_info(uuid="4fe04e7f-c643-4a19-b15c-cf417b2c9410") + def test_win10_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, bcastbit=True, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('server_id', self.server_addr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, bcast=True) + + def _run_win10_request_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, bcastbit=bcast, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('max_dhcp_size', 1500), + # HW type Ethernet (0x01) + ('client_id', b'\x01' + self.hwaddr), + ('hostname', b'test120-w'), + # Client Fully Qualified Domain Name + (81, b'\x00\x00\x00test120-w.ad.tst.example.com'), + ('vendor_class_id', b'MSFT 5.0'), + self._make_win10_paramrequestlist(), + ]) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, bcast=bcast) + + @test_tracker_info(uuid="1b23c9c7-cc94-42d0-83a6-f1b2bc125fb9") + def test_win10_request_renewing(self): + self._run_win10_request_renewing(bcast=False) + + @test_tracker_info(uuid="c846bd14-71fb-4492-a4d3-0aa5c2c35751") + def test_win10_request_rebinding(self): + self._run_win10_request_renewing(bcast=True) + + def _make_debian_paramrequestlist(self): + return make_paramrequestlist_opt([ + 'subnet_mask', + 'broadcast_address', + 'router', + 'name_server', + 119, # Domain Search + 'hostname', + 101, # TCode + 'domain', # NetBIOS over TCP/IP Name Server + 'vendor_specific', # NetBIOS over TCP/IP Node Type + 121, # Classless Static Route + 249, # Private/Classless Static Route (MS) + 33, # Static Route + 252, # Private/Proxy autodiscovery + 'NTP_server', + ]) + + @test_tracker_info(uuid="b0bb6ae7-07e6-4ecb-9a2f-db9c8146a3d5") + def test_debian_dhclient_4_3_5_discover(self): + req_ip = NETADDR_PREFIX + '109' + req = self._make_discover(self.hwaddr, + options=[ + ('requested_addr', req_ip), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(26)) + resp = self._get_response(req) + self._assert_offer(resp) + self._assert_standard_offer_or_ack(resp) + asserts.assert_equal(req_ip, get_yiaddr(resp)) + + @test_tracker_info(uuid="d70bc043-84cb-4735-9123-c46c6d1ce5ac") + def test_debian_dhclient_4_3_5_request_selecting(self): + req = self._make_request(self.hwaddr, None, None, + options=[ + ('server_id', self.server_addr), + ('requested_addr', NETADDR_PREFIX + '109'), + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], opts_padding=bytes(20)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, with_hostname=True) + + def _run_debian_renewing(self, bcast): + req_ip = NETADDR_PREFIX + '109' + req = self._make_request(self.hwaddr, None, None, + ciaddr=req_ip, ip_src=req_ip, + ip_dst=NETADDR_BROADCAST if bcast else self.server_addr, + options=[ + ('hostname', b'test12'), + self._make_debian_paramrequestlist(), + ], + opts_padding=bytes(32)) + resp = self._get_response(req) + self._assert_ack(resp) + self._assert_standard_offer_or_ack(resp, renewing=True, + with_hostname=True) + + @test_tracker_info(uuid="5e1e817d-9972-46ca-8d44-1e120bf1bafc") + def test_debian_dhclient_4_3_5_request_renewing(self): + self._run_debian_renewing(bcast=False) + + @test_tracker_info(uuid="b179a36d-910e-4006-a79a-11cc561b69db") + def test_debian_dhclient_4_3_5_request_rebinding(self): + self._run_debian_renewing(bcast=True) + + def _assert_standard_offer_or_ack(self, resp, renewing=False, bcast=False, + with_hostname=False): + # Responses to renew/rebind are always unicast to ciaddr even with + # broadcast flag set (RFC does not define this behavior, but this is + # more efficient and matches previous behavior) + if bcast and not renewing: + self._assert_broadcast(resp) + else: + self._assert_unicast(resp) + self._assert_broadcastbit(resp, isset=bcast) + + bootp_resp = resp.getlayer(BOOTP) + asserts.assert_equal(0, bootp_resp.hops) + if renewing: + asserts.assert_true(bootp_resp.ciaddr.startswith(NETADDR_PREFIX), + 'ciaddr does not start with expected prefix') + else: + asserts.assert_equal(INET4_ANY, bootp_resp.ciaddr) + asserts.assert_true(bootp_resp.yiaddr.startswith(NETADDR_PREFIX), + 'yiaddr does not start with expected prefix') + asserts.assert_true(bootp_resp.siaddr.startswith(NETADDR_PREFIX), + 'siaddr does not start with expected prefix') + asserts.assert_equal(INET4_ANY, bootp_resp.giaddr) + + opt_labels = get_opt_labels(bootp_resp) + # FQDN option 81 is not supported in new behavior + opt_labels = [opt for opt in opt_labels if opt != 81] + + # Expect exactly these options in this order + expected_opts = [ + 'message-type', 'server_id', 'lease_time', 'renewal_time', + 'rebinding_time', 'subnet_mask', 'broadcast_address', 'router', + 'name_server'] + if with_hostname: + expected_opts.append('hostname') + expected_opts.extend(['vendor_specific', 'end']) + asserts.assert_equal(expected_opts, opt_labels) + + def _request_address(self, hwaddr, bcast=True): + resp = self._get_response(self._make_discover(hwaddr)) self._assert_offer(resp) addr = get_yiaddr(resp) siaddr = getopt(resp, 'server_id') - resp = self._get_response(make_request(self.hwaddr, addr, siaddr)) + resp = self._get_response(self._make_request(hwaddr, addr, siaddr, + ip_dst=(INET4_ANY if bcast else siaddr))) self._assert_ack(resp) return addr, siaddr, resp @@ -343,7 +883,7 @@ class DhcpServerTest(base_test.BaseTestClass): bootp_resp = (resp or None) and resp.getlayer(BOOTP) if bootp_resp != None and get_mess_type(bootp_resp) == ACK: # Note down corresponding release for this request - release = make_release(bootp_resp.chaddr, bootp_resp.yiaddr, + release = self._make_release(bootp_resp.chaddr, bootp_resp.yiaddr, getopt(bootp_resp, 'server_id')) self.cleanup_releases.append(release) return resp @@ -368,7 +908,12 @@ class DhcpServerTest(base_test.BaseTestClass): asserts.assert_false(None == packet, "No packet") asserts.assert_equal(packet.getlayer(Ether).dst, BROADCAST_MAC) asserts.assert_equal(packet.getlayer(IP).dst, NETADDR_BROADCAST) - asserts.assert_equal(packet.getlayer(BOOTP).flags, 0x8000) + self._assert_broadcastbit(packet) + + def _assert_broadcastbit(self, packet, isset=True): + mask = 0x8000 + flag = packet.getlayer(BOOTP).flags + asserts.assert_equal(flag & mask, mask if isset else 0) def _assert_unicast(self, packet, ipAddr=None): asserts.assert_false(None == packet, "No packet") @@ -382,10 +927,87 @@ class DhcpServerTest(base_test.BaseTestClass): self.next_hwaddr_index = self.next_hwaddr_index + 1 return addr + def _make_dhcp(self, src_hwaddr, options, ciaddr=INET4_ANY, + ip_src=INET4_ANY, ip_dst=NETADDR_BROADCAST, giaddr=INET4_ANY, + bcastbit=False): + broadcast = (ip_dst == NETADDR_BROADCAST) + ethernet = Ether(dst=(BROADCAST_MAC if broadcast else self.dut_hwaddr)) + ip = IP(src=ip_src, dst=ip_dst) + udp = UDP(sport=68, dport=SERVER_PORT) + bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, giaddr=giaddr, + flags=(0x8000 if bcastbit else 0), xid=random.randrange(0, 2**32)) + dhcp = DHCP(options=options) + return ethernet / ip / udp / bootp / dhcp + + def _make_discover(self, src_hwaddr, options = [], giaddr=INET4_ANY, + bcastbit=False, opts_padding=None, ip_src=INET4_ANY): + opts = [('message-type','discover')] + opts.extend(options) + opts.append('end') + if (opts_padding): + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, giaddr=giaddr, + ip_dst=NETADDR_BROADCAST, bcastbit=bcastbit, ip_src=ip_src) + + def _make_request(self, src_hwaddr, reqaddr, siaddr, ciaddr=INET4_ANY, + ip_dst=None, ip_src=None, giaddr=INET4_ANY, bcastbit=False, + options=[], opts_padding=None): + if not ip_dst: + ip_dst = siaddr or INET4_ANY + + if not ip_src and ip_dst == INET4_ANY: + ip_src = INET4_ANY + elif not ip_src: + ip_src = (giaddr if not isempty(giaddr) + else ciaddr if not isempty(ciaddr) + else reqaddr) + # Kernel will not receive unicast UDP packets with empty ip_src + asserts.assert_false(ip_dst != INET4_ANY and isempty(ip_src), + "Unicast ip_src cannot be zero") + opts = [('message-type', 'request')] + if options: + opts.extend(options) + else: + if siaddr: + opts.append(('server_id', siaddr)) + if reqaddr: + opts.append(('requested_addr', reqaddr)) + opts.append('end') + if opts_padding: + opts.append(opts_padding) + return self._make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, + ip_src=ip_src, ip_dst=ip_dst, giaddr=giaddr, bcastbit=bcastbit) + + def _make_release(self, src_hwaddr, addr, server_id): + opts = [('message-type', 'release'), ('server_id', server_id), 'end'] + return self._make_dhcp(src_hwaddr, opts, ciaddr=addr, ip_src=addr, + ip_dst=server_id) + +def assert_bootp_response(resp, req): + bootp = resp.getlayer(BOOTP) + asserts.assert_equal(2, bootp.op, 'Invalid BOOTP op') + asserts.assert_equal(1, bootp.htype, 'Invalid BOOTP htype') + asserts.assert_equal(6, bootp.hlen, 'Invalid BOOTP hlen') + asserts.assert_equal(0, bootp.hops, 'Invalid BOOTP hops') + asserts.assert_equal(req.getlayer(BOOTP).xid, bootp.xid, 'Invalid XID') + return bootp + + +def make_paramrequestlist_opt(params): + param_indexes = [DHCPRevOptions[opt][0] if isinstance(opt, str) else opt + for opt in params] + return tuple(['param_req_list'] + [ + opt.to_bytes(1, byteorder='big') if isinstance(opt, int) else opt + for opt in param_indexes]) + + +def isempty(addr): + return not addr or addr == INET4_ANY + def setopt(packet, optname, val): dhcp = packet.getlayer(DHCP) - if optname in [opt[0] for opt in dhcp.options]: + if optname in get_opt_labels(dhcp): dhcp.options = [(optname, val) if opt[0] == optname else opt for opt in dhcp.options] else: @@ -403,55 +1025,31 @@ def removeopt(packet, key): dhcp.options = [opt for opt in dhcp.options if opt[0] != key] -def get_yiaddr(packet): - return packet.getlayer(BOOTP).yiaddr - - -def get_mess_type(packet): - return getopt(packet, 'message-type') +def get_opt_labels(packet): + dhcp_resp = packet.getlayer(DHCP) + # end option is a single string, not a tuple. + return [opt if isinstance(opt, str) else opt[0] + for opt in dhcp_resp.options if opt != 'pad'] -def make_dhcp(src_hwaddr, options, ciaddr='0.0.0.0', ipSrc='0.0.0.0', - ipDst=NETADDR_BROADCAST): - broadcast = (ipDst == NETADDR_BROADCAST) - ethernet = Ether(dst=BROADCAST_MAC) if broadcast else Ether() - ip = IP(src=ipSrc, dst=ipDst) - udp = UDP(sport=68, dport=SERVER_PORT) - bootp = BOOTP(chaddr=src_hwaddr, ciaddr=ciaddr, - flags=(0x8000 if broadcast else 0), xid=RandInt()) - dhcp = DHCP(options=options) - return ethernet / ip / udp / bootp / dhcp - - -def make_discover(src_hwaddr, options = []): - opts = [('message-type','discover')] - opts.extend(options) - opts.append('end') - return make_dhcp(src_hwaddr, options=opts) +def get_yiaddr(packet): + return packet.getlayer(BOOTP).yiaddr -def make_request(src_hwaddr, reqaddr, siaddr, ciaddr='0.0.0.0', ipSrc=None): - if ipSrc == None: - ipSrc = ciaddr - opts = [('message-type', 'request')] - if siaddr: - opts.append(('server_id', siaddr)) - if reqaddr: - opts.append(('requested_addr', reqaddr)) - opts.append('end') - return make_dhcp(src_hwaddr, options=opts, ciaddr=ciaddr, ipSrc=ciaddr) +def get_chaddr(packet): + # We use Ethernet addresses. Ignore address padding + return packet.getlayer(BOOTP).chaddr[:6] -def make_release(src_hwaddr, addr, server_id): - opts = [('message-type', 'release'), ('server_id', server_id), 'end'] - return make_dhcp(src_hwaddr, opts, ciaddr=addr, ipSrc=addr, ipDst=server_id) +def get_mess_type(packet): + return getopt(packet, 'message-type') def make_hwaddr(index): if index > 0xffff: raise ValueError("Address index out of range") - return '\x44\x85\x00\x00{}{}'.format(chr(index >> 8), chr(index & 0xff)) + return b'\x44\x85\x00\x00' + bytes([index >> 8, index & 0xff]) def format_hwaddr(addr): - return ':'.join(['%02x' % ord(c) for c in addr]) + return ':'.join(['%02x' % c for c in addr]) |