summaryrefslogtreecommitdiffstats
path: root/net/test/srcaddr_selection_test.py
blob: 1e7a107b83b0027e66d8eda39d1d57f1e39ad3c1 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
#!/usr/bin/python
#
# Copyright 2014 The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import errno
import random
from socket import *  # pylint: disable=wildcard-import
import time
import unittest

from scapy import all as scapy

import csocket
import iproute
import multinetwork_base
import packets
import net_test

# Setsockopt values.
IPV6_ADDR_PREFERENCES = 72
IPV6_PREFER_SRC_PUBLIC = 0x0002

# The retrans timer is also the DAD timeout. We set this to a value that's not
# so short that DAD will complete before we attempt to use the network, but
# short enough that we don't have to wait too long for DAD to complete.
RETRANS_TIMER = 150


class IPv6SourceAddressSelectionTest(multinetwork_base.MultiNetworkBaseTest):
  """Test for IPv6 source address selection.

  Relevant kernel commits:
    upstream net-next:
      7fd2561 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
      c58da4c net: ipv6: allow explicitly choosing optimistic addresses
      9131f3d ipv6: Do not iterate over all interfaces when finding source address on specific interface.
      c0b8da1 ipv6: Fix finding best source address in ipv6_dev_get_saddr().
      c15df30 ipv6: Remove unused arguments for __ipv6_dev_get_saddr().
      3985e8a ipv6: sysctl to restrict candidate source addresses

    android-3.10:
      2ce95507 net: ipv6: Add a sysctl to make optimistic addresses useful candidates
      0065bf4 net: ipv6: allow choosing optimistic addresses with use_optimistic
      0633924 ipv6: sysctl to restrict candidate source addresses
  """

  def SetIPv6Sysctl(self, ifname, sysctl, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/%s" % (ifname, sysctl), value)

  def SetDAD(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/accept_dad" % ifname, value)
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/dad_transmits" % ifname, value)

  def SetOptimisticDAD(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/optimistic_dad" % ifname, value)

  def SetUseTempaddrs(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_tempaddr" % ifname, value)

  def SetUseOptimistic(self, ifname, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/%s/use_optimistic" % ifname, value)

  def SetForwarding(self, value):
    self.SetSysctl("/proc/sys/net/ipv6/conf/all/forwarding", value)

  def GetSourceIP(self, netid, mode="mark"):
    s = self.BuildSocket(6, net_test.UDPSocket, netid, mode)
    # Because why not...testing for temporary addresses is a separate thing.
    s.setsockopt(IPPROTO_IPV6, IPV6_ADDR_PREFERENCES, IPV6_PREFER_SRC_PUBLIC)

    s.connect((net_test.IPV6_ADDR, 123))
    src_addr = s.getsockname()[0]
    self.assertTrue(src_addr)
    return src_addr

  def assertAddressNotPresent(self, address):
    self.assertRaises(IOError, self.iproute.GetAddress, address)

  def assertAddressHasExpectedAttributes(
      self, address, expected_ifindex, expected_flags):
    ifa_msg = self.iproute.GetAddress(address)[0]
    self.assertEqual(AF_INET6 if ":" in address else AF_INET, ifa_msg.family)
    self.assertEqual(64, ifa_msg.prefixlen)
    self.assertEqual(iproute.RT_SCOPE_UNIVERSE, ifa_msg.scope)
    self.assertEqual(expected_ifindex, ifa_msg.index)
    self.assertEqual(expected_flags, ifa_msg.flags & expected_flags)

  def AddressIsTentative(self, address):
    ifa_msg = self.iproute.GetAddress(address)[0]
    return ifa_msg.flags & iproute.IFA_F_TENTATIVE

  def BindToAddress(self, address):
    s = net_test.UDPSocket(AF_INET6)
    s.bind((address, 0, 0, 0))

  def SendWithSourceAddress(self, address, netid, dest=net_test.IPV6_ADDR):
    pktinfo = multinetwork_base.MakePktInfo(6, address, 0)
    cmsgs = [(net_test.SOL_IPV6, IPV6_PKTINFO, pktinfo)]
    s = self.BuildSocket(6, net_test.UDPSocket, netid, "mark")
    return csocket.Sendmsg(s, (dest, 53), "Hello", cmsgs, 0)

  def assertAddressUsable(self, address, netid):
    self.BindToAddress(address)
    self.SendWithSourceAddress(address, netid)
    # No exceptions? Good.

  def assertAddressNotUsable(self, address, netid):
    self.assertRaisesErrno(errno.EADDRNOTAVAIL, self.BindToAddress, address)
    self.assertRaisesErrno(errno.EINVAL,
                           self.SendWithSourceAddress, address, netid)

  def assertAddressSelected(self, address, netid):
    self.assertEqual(address, self.GetSourceIP(netid))

  def assertAddressNotSelected(self, address, netid):
    self.assertNotEqual(address, self.GetSourceIP(netid))

  def WaitForDad(self, address):
    for _ in range(20):
      if not self.AddressIsTentative(address):
        return
      time.sleep(0.1)
    raise AssertionError("%s did not complete DAD after 2 seconds")


class MultiInterfaceSourceAddressSelectionTest(IPv6SourceAddressSelectionTest):

  def setUp(self):
    # [0]  Make sure DAD, optimistic DAD, and the use_optimistic option
    # are all consistently disabled at the outset.
    for netid in self.tuns:
      ifname = self.GetInterfaceName(netid)
      self.SetDAD(ifname, 0)
      self.SetOptimisticDAD(ifname, 0)
      self.SetUseTempaddrs(ifname, 0)
      self.SetUseOptimistic(ifname, 0)
      self.SetIPv6Sysctl(ifname, "use_oif_addrs_only", 0)

    # [1]  Pick an interface on which to test.
    self.test_netid = random.choice(list(self.tuns.keys()))
    self.test_ip = self.MyAddress(6, self.test_netid)
    self.test_ifindex = self.ifindices[self.test_netid]
    self.test_ifname = self.GetInterfaceName(self.test_netid)
    self.test_lladdr = net_test.GetLinkAddress(self.test_ifname, True)

    # [2]  Delete the test interface's IPv6 address.
    self.iproute.DelAddress(self.test_ip, 64, self.test_ifindex)
    self.assertAddressNotPresent(self.test_ip)

    self.assertAddressNotUsable(self.test_ip, self.test_netid)
    # Verify that the link-local address is not tentative.
    # Even though we disable DAD above, without this change occasionally the
    # test fails. This might be due to us disabling DAD only after the
    # link-local address is generated.
    self.WaitForDad(self.test_lladdr)

    # Disable forwarding, because optimistic addresses don't work when
    # forwarding is on. Forwarding will be re-enabled when the sysctls are
    # restored by MultiNetworkBaseTest.tearDownClass.
    # TODO: Fix this and remove this hack.
    self.SetForwarding("0")


class TentativeAddressTest(MultiInterfaceSourceAddressSelectionTest):

  def testRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Get flags and prove tentative-ness.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_TENTATIVE)

    # Even though the interface has an IPv6 address, its tentative nature
    # prevents it from being selected.
    self.assertAddressNotUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # Busy wait for DAD to complete (should be less than 1 second).
    self.WaitForDad(self.test_ip)

    # The test_ip should have completed DAD by now, and should be the
    # chosen source address, eligible to bind to, etc.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class OptimisticAddressTest(MultiInterfaceSourceAddressSelectionTest):

  def testRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Get flags and prove optimism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # Optimistic addresses are usable but are not selected.
    if net_test.LINUX_VERSION >= (3, 18, 0):
      # The version checked in to android kernels <= 3.10 requires the
      # use_optimistic sysctl to be turned on.
      self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # Busy wait for DAD to complete (should be less than 1 second).
    self.WaitForDad(self.test_ip)

    # The test_ip should have completed DAD by now, and should be the
    # chosen source address.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class OptimisticAddressOkayTest(MultiInterfaceSourceAddressSelectionTest):

  def testModifiedRfc6724Behaviour(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Get flags and prove optimistism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # The interface has an IPv6 address and, despite its optimistic nature,
    # the use_optimistic option allows it to be selected.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)


class ValidBeforeOptimisticTest(MultiInterfaceSourceAddressSelectionTest):

  def testModifiedRfc6724Behaviour(self):
    # [3]  Add a valid IPv6 address to this interface and verify it is
    # selected as the source address.
    preferred_ip = self.OnlinkPrefix(6, self.test_netid) + "cafe"
    self.iproute.AddAddress(preferred_ip, 64, self.test_ifindex)
    self.assertAddressHasExpectedAttributes(
        preferred_ip, self.test_ifindex, iproute.IFA_F_PERMANENT)
    self.assertEqual(preferred_ip, self.GetSourceIP(self.test_netid))

    # [4]  Get another IPv6 address, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Get flags and prove optimism.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)

    # Since the interface has another IPv6 address, the optimistic address
    # is not selected--the other, valid address is chosen.
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)
    self.assertAddressSelected(preferred_ip, self.test_netid)


class DadFailureTest(MultiInterfaceSourceAddressSelectionTest):

  def testDadFailure(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Prove optimism and usability.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)

    # Send a NA for the optimistic address, indicating address conflict
    # ("DAD defense").
    conflict_macaddr = "02:00:0b:ad:d0:0d"
    dad_defense = (scapy.Ether(src=conflict_macaddr, dst="33:33:33:00:00:01") /
                   scapy.IPv6(src=self.test_ip, dst="ff02::1") /
                   scapy.ICMPv6ND_NA(tgt=self.test_ip, R=0, S=0, O=1) /
                   scapy.ICMPv6NDOptDstLLAddr(lladdr=conflict_macaddr))
    self.ReceiveEtherPacketOn(self.test_netid, dad_defense)
    self.WaitForDad(self.test_lladdr)

    # The address should have failed DAD, and therefore no longer be usable.
    self.assertAddressNotUsable(self.test_ip, self.test_netid)
    self.assertAddressNotSelected(self.test_ip, self.test_netid)

    # TODO(ek): verify that an RTM_DELADDR issued for the DAD-failed address.


class NoNsFromOptimisticTest(MultiInterfaceSourceAddressSelectionTest):

  def testSendToOnlinkDestination(self):
    # [3]  Get an IPv6 address back, in optimistic DAD start-up.
    self.SetDAD(self.test_ifname, 1)  # Enable DAD
    self.SetOptimisticDAD(self.test_ifname, 1)
    self.SetUseOptimistic(self.test_ifname, 1)
    # Send a RA to start SLAAC and subsequent DAD.
    self.SendRA(self.test_netid, retranstimer=RETRANS_TIMER)
    # Prove optimism and usability.
    self.assertAddressHasExpectedAttributes(
        self.test_ip, self.test_ifindex, iproute.IFA_F_OPTIMISTIC)
    self.assertAddressUsable(self.test_ip, self.test_netid)
    self.assertAddressSelected(self.test_ip, self.test_netid)

    # [4]  Send to an on-link destination and observe a Neighbor Solicitation
    # packet with a source address that is NOT the optimistic address.
    # In this setup, the only usable address is the link-local address.
    onlink_dest = self.GetRandomDestination(
        self.OnlinkPrefix(6, self.test_netid))
    self.SendWithSourceAddress(self.test_ip, self.test_netid, onlink_dest)

    if net_test.LINUX_VERSION >= (3, 18, 0):
      # Older versions will actually choose the optimistic address to
      # originate Neighbor Solications (RFC violation).
      expected_ns = packets.NS(
          self.test_lladdr,
          onlink_dest,
          self.MyMacAddress(self.test_netid))[1]
      self.ExpectPacketOn(self.test_netid, "link-local NS", expected_ns)


# TODO(ek): add tests listening for netlink events.


class DefaultCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):

  def testChoosesNonInterfaceSourceAddress(self):
    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 0)
    src_ip = self.GetSourceIP(self.test_netid)
    self.assertFalse(src_ip in [self.test_ip, self.test_lladdr])
    self.assertTrue(src_ip in
                    [self.MyAddress(6, netid)
                     for netid in self.tuns if netid != self.test_netid])


class RestrictedCandidateSrcAddrsTest(MultiInterfaceSourceAddressSelectionTest):

  def testChoosesOnlyInterfaceSourceAddress(self):
    self.SetIPv6Sysctl(self.test_ifname, "use_oif_addrs_only", 1)
    # self.test_ifname does not have a global IPv6 address, so the only
    # candidate is the existing link-local address.
    self.assertAddressSelected(self.test_lladdr, self.test_netid)


if __name__ == "__main__":
  unittest.main()