123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.pair.tuntap}.
- """
- from __future__ import division, absolute_import
- import os
- import struct
- import socket
- from errno import EPERM, EBADF, EINVAL, EAGAIN, EWOULDBLOCK, ENOENT, ENODEV
- from random import randrange
- from collections import deque
- from itertools import cycle
- from signal import SIGINT
- from twisted.python.compat import intToBytes, _PY3
- from twisted.python.reflect import ObjectNotFound, namedAny
- try:
- namedAny("fcntl.ioctl")
- except (ObjectNotFound, AttributeError):
- platformSkip = "Platform is missing fcntl/ioctl support"
- else:
- platformSkip = None
- from zope.interface import Interface, implementer
- from zope.interface.verify import verifyObject
- from twisted.internet.interfaces import IListeningPort
- from twisted.internet.protocol import DatagramProtocol
- from twisted.pair.rawudp import RawUDPProtocol
- from twisted.pair.ip import IPProtocol
- from twisted.pair.ethernet import EthernetProtocol
- from twisted.python.reflect import fullyQualifiedName
- from twisted.python.compat import iterbytes
- from twisted.python.log import addObserver, removeObserver, textFromEventDict
- from twisted.internet.interfaces import IAddress, IReactorFDSet
- from twisted.internet.protocol import AbstractDatagramProtocol, Factory
- from twisted.internet.task import Clock
- from twisted.trial.unittest import SkipTest, SynchronousTestCase
- from twisted.internet.error import CannotListenError
- from twisted.pair.raw import IRawPacketProtocol
- # Let the module-scope testing subclass of this still be defined (and then not
- # used) in case we can't import from twisted.pair.testing due to platform
- # limitations.
- _RealSystem = object
- # Same rationale as for _RealSystem.
- _IInputOutputSystem = Interface
- if platformSkip is None:
- from twisted.pair.testing import (
- _PI_SIZE, Tunnel, MemoryIOSystem, _IPv4, _H, _ethernet, _ip, _udp)
- from twisted.pair.tuntap import (
- _TUNSETIFF, _IFNAMSIZ, _RealSystem,
- _IInputOutputSystem, TunnelFlags, TunnelAddress, TuntapPort)
- else:
- skip = platformSkip
- @implementer(IReactorFDSet)
- class ReactorFDSet(object):
- """
- An implementation of L{IReactorFDSet} which only keeps track of which
- descriptors have been registered for reading and writing.
- This implementation isn't actually capable of determining readability or
- writeability and generates no events for the descriptors registered with
- it.
- @ivar _readers: A L{set} of L{IReadDescriptor} providers which the reactor
- is supposedly monitoring for read events.
- @ivar _writers: A L{set} of L{IWriteDescriptor} providers which the reactor
- is supposedly monitoring for write events.
- """
- def __init__(self):
- self._readers = set()
- self._writers = set()
- self.addReader = self._readers.add
- self.addWriter = self._writers.add
- def removeReader(self, reader):
- self._readers.discard(reader)
- def removeWriter(self, writer):
- self._writers.discard(writer)
- def getReaders(self):
- return iter(self._readers)
- def getWriters(self):
- return iter(self._writers)
- def removeAll(self):
- try:
- return list(self._readers | self._writers)
- finally:
- self._readers = set()
- self._writers = set()
- verifyObject(IReactorFDSet, ReactorFDSet())
- class FSSetClock(Clock, ReactorFDSet):
- """
- An L{FSSetClock} is a L{IReactorFDSet} and an L{IReactorClock}.
- """
- def __init__(self):
- Clock.__init__(self)
- ReactorFDSet.__init__(self)
- class TunHelper(object):
- """
- A helper for tests of tun-related functionality (ip-level tunnels).
- """
- @property
- def TUNNEL_TYPE(self):
- # Hide this in a property because TunnelFlags is not always imported.
- return TunnelFlags.IFF_TUN | TunnelFlags.IFF_NO_PI
- def __init__(self, tunnelRemote, tunnelLocal):
- """
- @param tunnelRemote: The source address for UDP datagrams originated
- from this helper. This is an IPv4 dotted-quad string.
- @type tunnelRemote: L{bytes}
- @param tunnelLocal: The destination address for UDP datagrams
- originated from this helper. This is an IPv4 dotted-quad string.
- @type tunnelLocal: L{bytes}
- """
- self.tunnelRemote = tunnelRemote
- self.tunnelLocal = tunnelLocal
- def encapsulate(self, source, destination, payload):
- """
- Construct an ip datagram containing a udp datagram containing the given
- application-level payload.
- @param source: The source port for the UDP datagram being encapsulated.
- @type source: L{int}
- @param destination: The destination port for the UDP datagram being
- encapsulated.
- @type destination: L{int}
- @param payload: The application data to include in the udp datagram.
- @type payload: L{bytes}
- @return: An ethernet frame.
- @rtype: L{bytes}
- """
- return _ip(
- src=self.tunnelRemote, dst=self.tunnelLocal,
- payload=_udp(
- src=source, dst=destination, payload=payload))
- def parser(self):
- """
- Get a function for parsing a datagram read from a I{tun} device.
- @return: A function which accepts a datagram exactly as might be read
- from a I{tun} device. The datagram is expected to ultimately carry
- a UDP datagram. When called, it returns a L{list} of L{tuple}s.
- Each tuple has the UDP application data as the first element and
- the sender address as the second element.
- """
- datagrams = []
- receiver = DatagramProtocol()
- def capture(*args):
- datagrams.append(args)
- receiver.datagramReceived = capture
- udp = RawUDPProtocol()
- udp.addProto(12345, receiver)
- ip = IPProtocol()
- ip.addProto(17, udp)
- def parse(data):
- # TUN devices omit the ethernet framing so we can start parsing
- # right at the IP layer.
- ip.datagramReceived(data, False, None, None, None)
- return datagrams
- return parse
- class TapHelper(object):
- """
- A helper for tests of tap-related functionality (ethernet-level tunnels).
- """
- @property
- def TUNNEL_TYPE(self):
- flag = TunnelFlags.IFF_TAP
- if not self.pi:
- flag |= TunnelFlags.IFF_NO_PI
- return flag
- def __init__(self, tunnelRemote, tunnelLocal, pi):
- """
- @param tunnelRemote: The source address for UDP datagrams originated
- from this helper. This is an IPv4 dotted-quad string.
- @type tunnelRemote: L{bytes}
- @param tunnelLocal: The destination address for UDP datagrams
- originated from this helper. This is an IPv4 dotted-quad string.
- @type tunnelLocal: L{bytes}
- @param pi: A flag indicating whether this helper will generate and
- consume a protocol information (PI) header.
- @type pi: L{bool}
- """
- self.tunnelRemote = tunnelRemote
- self.tunnelLocal = tunnelLocal
- self.pi = pi
- def encapsulate(self, source, destination, payload):
- """
- Construct an ethernet frame containing an ip datagram containing a udp
- datagram containing the given application-level payload.
- @param source: The source port for the UDP datagram being encapsulated.
- @type source: L{int}
- @param destination: The destination port for the UDP datagram being
- encapsulated.
- @type destination: L{int}
- @param payload: The application data to include in the udp datagram.
- @type payload: L{bytes}
- @return: An ethernet frame.
- @rtype: L{bytes}
- """
- tun = TunHelper(self.tunnelRemote, self.tunnelLocal)
- ip = tun.encapsulate(source, destination, payload)
- frame = _ethernet(
- src=b'\x00\x00\x00\x00\x00\x00', dst=b'\xff\xff\xff\xff\xff\xff',
- protocol=_IPv4, payload=ip)
- if self.pi:
- # Going to send a datagram using IPv4 addressing
- protocol = _IPv4
- # There are no flags though
- flags = 0
- frame = _H(flags) + _H(protocol) + frame
- return frame
- def parser(self):
- """
- Get a function for parsing a datagram read from a I{tap} device.
- @return: A function which accepts a datagram exactly as might be read
- from a I{tap} device. The datagram is expected to ultimately carry
- a UDP datagram. When called, it returns a L{list} of L{tuple}s.
- Each tuple has the UDP application data as the first element and
- the sender address as the second element.
- """
- datagrams = []
- receiver = DatagramProtocol()
- def capture(*args):
- datagrams.append(args)
- receiver.datagramReceived = capture
- udp = RawUDPProtocol()
- udp.addProto(12345, receiver)
- ip = IPProtocol()
- ip.addProto(17, udp)
- ether = EthernetProtocol()
- ether.addProto(0x800, ip)
- def parser(datagram):
- # TAP devices might include a PI header. Strip that off if we
- # expect it to be there.
- if self.pi:
- datagram = datagram[_PI_SIZE:]
- # TAP devices include ethernet framing so start parsing at the
- # ethernet layer.
- ether.datagramReceived(datagram)
- return datagrams
- return parser
- class TunnelTests(SynchronousTestCase):
- """
- L{Tunnel} is mostly tested by other test cases but some tests don't fit
- there. Those tests are here.
- """
- def test_blockingRead(self):
- """
- Blocking reads are not implemented by L{Tunnel.read}. Attempting one
- results in L{NotImplementedError} being raised.
- """
- tunnel = Tunnel(MemoryIOSystem(), os.O_RDONLY, None)
- self.assertRaises(
- NotImplementedError, tunnel.read, 1024)
- class TunnelDeviceTestsMixin(object):
- """
- A mixin defining tests that apply to L{_IInputOutputSystem}
- implementations.
- """
- def setUp(self):
- """
- Create the L{_IInputOutputSystem} provider under test and open a tunnel
- using it.
- """
- self.system = self.createSystem()
- self.fileno = self.system.open(b"/dev/net/tun",
- os.O_RDWR | os.O_NONBLOCK)
- self.addCleanup(self.system.close, self.fileno)
- mode = self.helper.TUNNEL_TYPE
- config = struct.pack(
- "%dsH" % (_IFNAMSIZ,), self._TUNNEL_DEVICE, mode.value)
- self.system.ioctl(self.fileno, _TUNSETIFF, config)
- def test_interface(self):
- """
- The object under test provides L{_IInputOutputSystem}.
- """
- self.assertTrue(verifyObject(_IInputOutputSystem, self.system))
- def _invalidFileDescriptor(self):
- """
- Get an invalid file descriptor.
- @return: An integer which is not a valid file descriptor at the time of
- this call. After any future system call which allocates a new file
- descriptor, there is no guarantee the returned file descriptor will
- still be invalid.
- """
- fd = self.system.open(b"/dev/net/tun", os.O_RDWR)
- self.system.close(fd)
- return fd
- def test_readEBADF(self):
- """
- The device's C{read} implementation raises L{OSError} with an errno of
- C{EBADF} when called on a file descriptor which is not valid (ie, which
- has no associated file description).
- """
- fd = self._invalidFileDescriptor()
- exc = self.assertRaises(OSError, self.system.read, fd, 1024)
- self.assertEqual(EBADF, exc.errno)
- def test_writeEBADF(self):
- """
- The device's C{write} implementation raises L{OSError} with an errno of
- C{EBADF} when called on a file descriptor which is not valid (ie, which
- has no associated file description).
- """
- fd = self._invalidFileDescriptor()
- exc = self.assertRaises(OSError, self.system.write, fd, b"bytes")
- self.assertEqual(EBADF, exc.errno)
- def test_closeEBADF(self):
- """
- The device's C{close} implementation raises L{OSError} with an errno of
- C{EBADF} when called on a file descriptor which is not valid (ie, which
- has no associated file description).
- """
- fd = self._invalidFileDescriptor()
- exc = self.assertRaises(OSError, self.system.close, fd)
- self.assertEqual(EBADF, exc.errno)
- def test_ioctlEBADF(self):
- """
- The device's C{ioctl} implementation raises L{OSError} with an errno of
- C{EBADF} when called on a file descriptor which is not valid (ie, which
- has no associated file description).
- """
- fd = self._invalidFileDescriptor()
- exc = self.assertRaises(
- IOError, self.system.ioctl, fd, _TUNSETIFF, b"tap0")
- self.assertEqual(EBADF, exc.errno)
- def test_ioctlEINVAL(self):
- """
- The device's C{ioctl} implementation raises L{IOError} with an errno of
- C{EINVAL} when called with a request (second argument) which is not a
- supported operation.
- """
- # Try to invent an unsupported request. Hopefully this isn't a real
- # request on any system.
- request = 0xDEADBEEF
- exc = self.assertRaises(
- IOError, self.system.ioctl, self.fileno, request, b"garbage")
- self.assertEqual(EINVAL, exc.errno)
- def test_receive(self):
- """
- If a UDP datagram is sent to an address reachable by the tunnel device
- then it can be read out of the tunnel device.
- """
- parse = self.helper.parser()
- found = False
- # Try sending the datagram a lot of times. There are no delivery
- # guarantees for UDP - not even over localhost.
- for i in range(100):
- key = randrange(2 ** 64)
- message = b"hello world:" + intToBytes(key)
- source = self.system.sendUDP(message, (self._TUNNEL_REMOTE, 12345))
- # Likewise try receiving each of those datagrams a lot of times.
- # Timing might cause us to miss it the first few dozen times
- # through the loop.
- for j in range(100):
- try:
- packet = self.system.read(self.fileno, 1024)
- except EnvironmentError as e:
- if e.errno in (EAGAIN, EWOULDBLOCK):
- break
- raise
- else:
- datagrams = parse(packet)
- if (message, source) in datagrams:
- found = True
- break
- del datagrams[:]
- if found:
- break
- if not found:
- self.fail("Never saw probe UDP packet on tunnel")
- def test_send(self):
- """
- If a UDP datagram is written the tunnel device then it is received by
- the network to which it is addressed.
- """
- # Construct a unique application payload so the receiving side can
- # unambiguously identify the datagram we sent.
- key = randrange(2 ** 64)
- message = b"hello world:" + intToBytes(key)
- # To avoid really inconvenient test failures where the test just hangs
- # forever, set up a timeout for blocking socket operations. This
- # shouldn't ever be triggered when the test is passing. It only serves
- # to make sure the test runs eventually completes if something is
- # broken in a way that prevents real traffic from flowing. The value
- # chosen is totally arbitrary (but it might coincidentally exactly
- # match trial's builtin timeout for asynchronous tests).
- self.addCleanup(socket.setdefaulttimeout, socket.getdefaulttimeout())
- socket.setdefaulttimeout(120)
- # Start listening for the test datagram first. The resulting port
- # object can be used to receive datagrams sent to _TUNNEL_LOCAL:12345 -
- # in other words, an application using the tunnel device will be able
- # to cause datagrams to arrive at this port as though they actually
- # traversed a network to arrive at this host.
- port = self.system.receiveUDP(self.fileno, self._TUNNEL_LOCAL, 12345)
- # Construct a packet with the appropriate wrappers and headings so that
- # it will arrive at the port created above.
- packet = self.helper.encapsulate(50000, 12345, message)
- # Write the packet to the tunnel device.
- self.system.write(self.fileno, packet)
- # Try to receive that datagram and verify it has the correct payload.
- packet = port.recv(1024)
- self.assertEqual(message, packet)
- class FakeDeviceTestsMixin(object):
- """
- Define a mixin for use with test cases that require an
- L{_IInputOutputSystem} provider. This mixin hands out L{MemoryIOSystem}
- instances as the provider of that interface.
- """
- _TUNNEL_DEVICE = b"tap-twistedtest"
- _TUNNEL_LOCAL = b"172.16.2.1"
- _TUNNEL_REMOTE = b"172.16.2.2"
- def createSystem(self):
- """
- Create and return a brand new L{MemoryIOSystem}.
- The L{MemoryIOSystem} knows how to open new tunnel devices.
- @return: The newly created I/O system object.
- @rtype: L{MemoryIOSystem}
- """
- system = MemoryIOSystem()
- system.registerSpecialDevice(Tunnel._DEVICE_NAME, Tunnel)
- return system
- class FakeTapDeviceTests(FakeDeviceTestsMixin,
- TunnelDeviceTestsMixin, SynchronousTestCase):
- """
- Run various tap-type tunnel unit tests against an in-memory I/O system.
- """
- FakeTapDeviceTests.helper = TapHelper(
- FakeTapDeviceTests._TUNNEL_REMOTE, FakeTapDeviceTests._TUNNEL_LOCAL,
- pi=False)
- class FakeTapDeviceWithPITests(FakeDeviceTestsMixin,
- TunnelDeviceTestsMixin, SynchronousTestCase):
- """
- Run various tap-type tunnel unit tests against an in-memory I/O system with
- the PI header enabled.
- """
- FakeTapDeviceWithPITests.helper = TapHelper(
- FakeTapDeviceTests._TUNNEL_REMOTE, FakeTapDeviceTests._TUNNEL_LOCAL,
- pi=True)
- class FakeTunDeviceTests(FakeDeviceTestsMixin,
- TunnelDeviceTestsMixin, SynchronousTestCase):
- """
- Run various tun-type tunnel unit tests against an in-memory I/O system.
- """
- FakeTunDeviceTests.helper = TunHelper(
- FakeTunDeviceTests._TUNNEL_REMOTE, FakeTunDeviceTests._TUNNEL_LOCAL)
- @implementer(_IInputOutputSystem)
- class TestRealSystem(_RealSystem):
- """
- Add extra skipping logic so tests that try to create real tunnel devices on
- platforms where those are not supported automatically get skipped.
- """
- def open(self, filename, *args, **kwargs):
- """
- Attempt an open, but if the file is /dev/net/tun and it does not exist,
- translate the error into L{SkipTest} so that tests that require
- platform support for tuntap devices are skipped instead of failed.
- """
- try:
- return super(TestRealSystem, self).open(filename, *args, **kwargs)
- except OSError as e:
- # The device file may simply be missing. The device file may also
- # exist but be unsupported by the kernel.
- if e.errno in (ENOENT, ENODEV) and filename == b"/dev/net/tun":
- raise SkipTest("Platform lacks /dev/net/tun")
- raise
- def ioctl(self, *args, **kwargs):
- """
- Attempt an ioctl, but translate permission denied errors into
- L{SkipTest} so that tests that require elevated system privileges and
- do not have them are skipped instead of failed.
- """
- try:
- return super(TestRealSystem, self).ioctl(*args, **kwargs)
- except IOError as e:
- if EPERM == e.errno:
- raise SkipTest("Permission to configure device denied")
- raise
- def sendUDP(self, datagram, address):
- """
- Use the platform network stack to send a datagram to the given address.
- @param datagram: A UDP datagram payload to send.
- @type datagram: L{bytes}
- @param address: The destination to which to send the datagram.
- @type address: L{tuple} of (L{bytes}, L{int})
- @return: The address from which the UDP datagram was sent.
- @rtype: L{tuple} of (L{bytes}, L{int})
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.bind(('172.16.0.1', 0))
- s.sendto(datagram, address)
- return s.getsockname()
- def receiveUDP(self, fileno, host, port):
- """
- Use the platform network stack to receive a datagram sent to the given
- address.
- @param fileno: The file descriptor of the tunnel used to send the
- datagram. This is ignored because a real socket is used to receive
- the datagram.
- @type fileno: L{int}
- @param host: The IPv4 address at which the datagram will be received.
- @type host: L{bytes}
- @param port: The UDP port number at which the datagram will be
- received.
- @type port: L{int}
- @return: A L{socket.socket} which can be used to receive the specified
- datagram.
- """
- s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- s.bind((host, port))
- return s
- class RealDeviceTestsMixin(object):
- """
- Define a mixin for use with test cases that require an
- L{_IInputOutputSystem} provider. This mixin hands out L{TestRealSystem}
- instances as the provider of that interface.
- """
- if platformSkip:
- skip = platformSkip
- def createSystem(self):
- """
- Create a real I/O system that can be used to open real tunnel device
- provided by the underlying system and previously configured.
- @return: The newly created I/O system object.
- @rtype: L{TestRealSystem}
- """
- return TestRealSystem()
- class RealDeviceWithProtocolInformationTests(RealDeviceTestsMixin,
- TunnelDeviceTestsMixin,
- SynchronousTestCase):
- """
- Run various tap-type tunnel unit tests, with "protocol information" (PI)
- turned on, against a real I/O system.
- """
- _TUNNEL_DEVICE = b"tap-twtest-pi"
- _TUNNEL_LOCAL = b"172.16.1.1"
- _TUNNEL_REMOTE = b"172.16.1.2"
- # The PI flag is not an inherent part of the tunnel. It must be specified
- # by each user of the tunnel. Thus, we must also have an indication of
- # whether we want PI so the tests can properly initialize the tunnel
- # device.
- helper = TapHelper(_TUNNEL_REMOTE, _TUNNEL_LOCAL, pi=True)
- class RealDeviceWithoutProtocolInformationTests(RealDeviceTestsMixin,
- TunnelDeviceTestsMixin,
- SynchronousTestCase):
- """
- Run various tap-type tunnel unit tests, with "protocol information" (PI)
- turned off, against a real I/O system.
- """
- _TUNNEL_DEVICE = b"tap-twtest"
- _TUNNEL_LOCAL = b"172.16.0.1"
- _TUNNEL_REMOTE = b"172.16.0.2"
- helper = TapHelper(_TUNNEL_REMOTE, _TUNNEL_LOCAL, pi=False)
- class TuntapPortTests(SynchronousTestCase):
- """
- Tests for L{TuntapPort} behavior that is independent of the tunnel type.
- """
- def test_interface(self):
- """
- A L{TuntapPort} instance provides L{IListeningPort}.
- """
- port = TuntapPort(b"device", EthernetProtocol())
- self.assertTrue(verifyObject(IListeningPort, port))
- def test_realSystem(self):
- """
- When not initialized with an I/O system, L{TuntapPort} uses a
- L{_RealSystem}.
- """
- port = TuntapPort(b"device", EthernetProtocol())
- self.assertIsInstance(port._system, _RealSystem)
- class TunnelTestsMixin(object):
- """
- A mixin defining tests for L{TuntapPort}.
- These tests run against L{MemoryIOSystem} (proven equivalent to the real
- thing by the tests above) to avoid performing any real I/O.
- """
- def setUp(self):
- """
- Create an in-memory I/O system and set up a L{TuntapPort} against it.
- """
- self.name = b"tun0"
- self.system = MemoryIOSystem()
- self.system.registerSpecialDevice(Tunnel._DEVICE_NAME, Tunnel)
- self.protocol = self.factory.buildProtocol(
- TunnelAddress(self.helper.TUNNEL_TYPE, self.name))
- self.reactor = FSSetClock()
- self.port = TuntapPort(
- self.name, self.protocol, reactor=self.reactor, system=self.system)
- def _tunnelTypeOnly(self, flags):
- """
- Mask off any flags except for L{TunnelType.IFF_TUN} and
- L{TunnelType.IFF_TAP}.
- @param flags: Flags from L{TunnelType} to mask.
- @type flags: L{FlagConstant}
- @return: The flags given by C{flags} except the two type flags.
- @rtype: L{FlagConstant}
- """
- return flags & (TunnelFlags.IFF_TUN | TunnelFlags.IFF_TAP)
- def test_startListeningOpensDevice(self):
- """
- L{TuntapPort.startListening} opens the tunnel factory character special
- device C{"/dev/net/tun"} and configures it as a I{tun} tunnel.
- """
- system = self.system
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- expected = (
- system.O_RDWR | system.O_CLOEXEC | system.O_NONBLOCK,
- b"tun0" + b"\x00" * (_IFNAMSIZ - len(b"tun0")),
- self.port.interface, False, True)
- actual = (
- tunnel.openFlags,
- tunnel.requestedName,
- tunnel.name, tunnel.blocking, tunnel.closeOnExec)
- self.assertEqual(expected, actual)
- def test_startListeningSetsConnected(self):
- """
- L{TuntapPort.startListening} sets C{connected} on the port object to
- C{True}.
- """
- self.port.startListening()
- self.assertTrue(self.port.connected)
- def test_startListeningConnectsProtocol(self):
- """
- L{TuntapPort.startListening} calls C{makeConnection} on the protocol
- the port was initialized with, passing the port as an argument.
- """
- self.port.startListening()
- self.assertIs(self.port, self.protocol.transport)
- def test_startListeningStartsReading(self):
- """
- L{TuntapPort.startListening} passes the port instance to the reactor's
- C{addReader} method to begin watching the port's file descriptor for
- data to read.
- """
- self.port.startListening()
- self.assertIn(self.port, self.reactor.getReaders())
- def test_startListeningHandlesOpenFailure(self):
- """
- L{TuntapPort.startListening} raises L{CannotListenError} if opening the
- tunnel factory character special device fails.
- """
- self.system.permissions.remove('open')
- self.assertRaises(CannotListenError, self.port.startListening)
- def test_startListeningHandlesConfigureFailure(self):
- """
- L{TuntapPort.startListening} raises L{CannotListenError} if the
- C{ioctl} call to configure the tunnel device fails.
- """
- self.system.permissions.remove('ioctl')
- self.assertRaises(CannotListenError, self.port.startListening)
- def _stopPort(self, port):
- """
- Verify that the C{stopListening} method of an L{IListeningPort} removes
- that port from the reactor's "readers" set and also that the
- L{Deferred} returned by that method fires with L{None}.
- @param port: The port object to stop.
- @type port: L{IListeningPort} provider
- """
- stopped = port.stopListening()
- self.assertNotIn(port, self.reactor.getReaders())
- # An unfortunate implementation detail
- self.reactor.advance(0)
- self.assertIsNone(self.successResultOf(stopped))
- def test_stopListeningStopsReading(self):
- """
- L{TuntapPort.stopListening} returns a L{Deferred} which fires after the
- port has been removed from the reactor's reader list by passing it to
- the reactor's C{removeReader} method.
- """
- self.port.startListening()
- fileno = self.port.fileno()
- self._stopPort(self.port)
- self.assertNotIn(fileno, self.system._openFiles)
- def test_stopListeningUnsetsConnected(self):
- """
- After the L{Deferred} returned by L{TuntapPort.stopListening} fires,
- the C{connected} attribute of the port object is set to C{False}.
- """
- self.port.startListening()
- self._stopPort(self.port)
- self.assertFalse(self.port.connected)
- def test_stopListeningStopsProtocol(self):
- """
- L{TuntapPort.stopListening} calls C{doStop} on the protocol the port
- was initialized with.
- """
- self.port.startListening()
- self._stopPort(self.port)
- self.assertIsNone(self.protocol.transport)
- def test_stopListeningWhenStopped(self):
- """
- L{TuntapPort.stopListening} returns a L{Deferred} which succeeds
- immediately if it is called when the port is not listening.
- """
- stopped = self.port.stopListening()
- self.assertIsNone(self.successResultOf(stopped))
- def test_multipleStopListening(self):
- """
- It is safe and a no-op to call L{TuntapPort.stopListening} more than
- once with no intervening L{TuntapPort.startListening} call.
- """
- self.port.startListening()
- self.port.stopListening()
- second = self.port.stopListening()
- self.reactor.advance(0)
- self.assertIsNone(self.successResultOf(second))
- def test_loseConnection(self):
- """
- L{TuntapPort.loseConnection} stops the port and is deprecated.
- """
- self.port.startListening()
- self.port.loseConnection()
- # An unfortunate implementation detail
- self.reactor.advance(0)
- self.assertFalse(self.port.connected)
- warnings = self.flushWarnings([self.test_loseConnection])
- self.assertEqual(DeprecationWarning, warnings[0]['category'])
- self.assertEqual(
- "twisted.pair.tuntap.TuntapPort.loseConnection was deprecated "
- "in Twisted 14.0.0; please use twisted.pair.tuntap.TuntapPort."
- "stopListening instead",
- warnings[0]['message'])
- self.assertEqual(1, len(warnings))
- def _stopsReadingTest(self, style):
- """
- Test that L{TuntapPort.doRead} has no side-effects under a certain
- exception condition.
- @param style: An exception instance to arrange for the (python wrapper
- around the) underlying platform I{read} call to fail with.
- @raise C{self.failureException}: If there are any observable
- side-effects.
- """
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- tunnel.nonBlockingExceptionStyle = style
- self.port.doRead()
- self.assertEqual([], self.protocol.received)
- def test_eagainStopsReading(self):
- """
- Once L{TuntapPort.doRead} encounters an I{EAGAIN} errno from a C{read}
- call, it returns.
- """
- self._stopsReadingTest(Tunnel.EAGAIN_STYLE)
- def test_ewouldblockStopsReading(self):
- """
- Once L{TuntapPort.doRead} encounters an I{EWOULDBLOCK} errno from a
- C{read} call, it returns.
- """
- self._stopsReadingTest(Tunnel.EWOULDBLOCK_STYLE)
- def test_eintrblockStopsReading(self):
- """
- Once L{TuntapPort.doRead} encounters an I{EINTR} errno from a C{read}
- call, it returns.
- """
- self._stopsReadingTest(Tunnel.EINTR_STYLE)
- def test_unhandledReadError(self):
- """
- If L{Tuntap.doRead} encounters any exception other than one explicitly
- handled by the code, the exception propagates to the caller.
- """
- class UnexpectedException(Exception):
- pass
- self.assertRaises(
- UnexpectedException,
- self._stopsReadingTest, UnexpectedException())
- def test_unhandledEnvironmentReadError(self):
- """
- Just like C{test_unhandledReadError}, but for the case where the
- exception that is not explicitly handled happens to be of type
- C{EnvironmentError} (C{OSError} or C{IOError}).
- """
- self.assertRaises(
- IOError,
- self._stopsReadingTest, IOError(EPERM, "Operation not permitted"))
- def test_doReadSmallDatagram(self):
- """
- L{TuntapPort.doRead} reads a datagram of fewer than
- C{TuntapPort.maxPacketSize} from the port's file descriptor and passes
- it to its protocol's C{datagramReceived} method.
- """
- datagram = b'x' * (self.port.maxPacketSize - 1)
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- tunnel.readBuffer.append(datagram)
- self.port.doRead()
- self.assertEqual([datagram], self.protocol.received)
- def test_doReadLargeDatagram(self):
- """
- L{TuntapPort.doRead} reads the first part of a datagram of more than
- C{TuntapPort.maxPacketSize} from the port's file descriptor and passes
- the truncated data to its protocol's C{datagramReceived} method.
- """
- datagram = b'x' * self.port.maxPacketSize
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- tunnel.readBuffer.append(datagram + b'y')
- self.port.doRead()
- self.assertEqual([datagram], self.protocol.received)
- def test_doReadSeveralDatagrams(self):
- """
- L{TuntapPort.doRead} reads several datagrams, of up to
- C{TuntapPort.maxThroughput} bytes total, before returning.
- """
- values = cycle(iterbytes(b'abcdefghijklmnopqrstuvwxyz'))
- total = 0
- datagrams = []
- while total < self.port.maxThroughput:
- datagrams.append(next(values) * self.port.maxPacketSize)
- total += self.port.maxPacketSize
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- tunnel.readBuffer.extend(datagrams)
- tunnel.readBuffer.append(b'excessive datagram, not to be read')
- self.port.doRead()
- self.assertEqual(datagrams, self.protocol.received)
- def _datagramReceivedException(self):
- """
- Deliver some data to a L{TuntapPort} hooked up to an application
- protocol that raises an exception from its C{datagramReceived} method.
- @return: Whatever L{AttributeError} exceptions are logged.
- """
- self.port.startListening()
- self.system.getTunnel(self.port).readBuffer.append(b"ping")
- # Break the application logic
- self.protocol.received = None
- self.port.doRead()
- return self.flushLoggedErrors(AttributeError)
- def test_datagramReceivedException(self):
- """
- If the protocol's C{datagramReceived} method raises an exception, the
- exception is logged.
- """
- errors = self._datagramReceivedException()
- self.assertEqual(1, len(errors))
- def test_datagramReceivedExceptionIdentifiesProtocol(self):
- """
- The exception raised by C{datagramReceived} is logged with a message
- identifying the offending protocol.
- """
- messages = []
- addObserver(messages.append)
- self.addCleanup(removeObserver, messages.append)
- self._datagramReceivedException()
- error = next(m for m in messages if m['isError'])
- message = textFromEventDict(error)
- self.assertEqual(
- "Unhandled exception from %s.datagramReceived" % (
- fullyQualifiedName(self.protocol.__class__),),
- message.splitlines()[0])
- def test_write(self):
- """
- L{TuntapPort.write} sends a datagram into the tunnel.
- """
- datagram = b"a b c d e f g"
- self.port.startListening()
- self.port.write(datagram)
- self.assertEqual(
- self.system.getTunnel(self.port).writeBuffer,
- deque([datagram]))
- def test_interruptedWrite(self):
- """
- If the platform write call is interrupted (causing the Python wrapper
- to raise C{IOError} with errno set to C{EINTR}), the write is re-tried.
- """
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- tunnel.pendingSignals.append(SIGINT)
- self.port.write(b"hello, world")
- self.assertEqual(deque([b"hello, world"]), tunnel.writeBuffer)
- def test_unhandledWriteError(self):
- """
- Any exception raised by the underlying write call, except for EINTR, is
- propagated to the caller.
- """
- self.port.startListening()
- tunnel = self.system.getTunnel(self.port)
- self.assertRaises(
- IOError,
- self.port.write, b"x" * tunnel.SEND_BUFFER_SIZE + b"y")
- def test_writeSequence(self):
- """
- L{TuntapPort.writeSequence} sends a datagram into the tunnel by
- concatenating the byte strings in the list passed to it.
- """
- datagram = [b"a", b"b", b"c", b"d"]
- self.port.startListening()
- self.port.writeSequence(datagram)
- self.assertEqual(
- self.system.getTunnel(self.port).writeBuffer,
- deque([b"".join(datagram)]))
- def test_getHost(self):
- """
- L{TuntapPort.getHost} returns a L{TunnelAddress} including the tunnel's
- type and name.
- """
- self.port.startListening()
- address = self.port.getHost()
- self.assertEqual(
- TunnelAddress(
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE),
- self.system.getTunnel(self.port).name),
- address)
- def test_listeningString(self):
- """
- The string representation of a L{TuntapPort} instance includes the
- tunnel type and interface and the protocol associated with the port.
- """
- self.port.startListening()
- if _PY3:
- self.assertRegex(str(self.port),
- fullyQualifiedName(self.protocol.__class__))
- expected = " listening on %s/%s>" % (
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE).name,
- self.system.getTunnel(self.port).name)
- self.assertTrue(str(self.port).find(expected) != -1)
- else:
- expected = "<%s listening on %s/%s>" % (
- fullyQualifiedName(self.protocol.__class__),
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE).name,
- self.system.getTunnel(self.port).name)
- self.assertEqual(expected, str(self.port))
- def test_unlisteningString(self):
- """
- The string representation of a L{TuntapPort} instance includes the
- tunnel type and interface and the protocol associated with the port.
- """
- if _PY3:
- self.assertRegex(str(self.port),
- fullyQualifiedName(self.protocol.__class__))
- expected = " not listening on %s/%s>" % (
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE).name,
- self.name)
- self.assertTrue(str(self.port).find(expected) != -1)
- else:
- expected = "<%s not listening on %s/%s>" % (
- fullyQualifiedName(self.protocol.__class__),
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE).name, self.name)
- self.assertEqual(expected, str(self.port))
- def test_logPrefix(self):
- """
- L{TuntapPort.logPrefix} returns a string identifying the application
- protocol and the type of tunnel.
- """
- self.assertEqual(
- "%s (%s)" % (
- self.protocol.__class__.__name__,
- self._tunnelTypeOnly(self.helper.TUNNEL_TYPE).name),
- self.port.logPrefix())
- class TunnelAddressTests(SynchronousTestCase):
- """
- Tests for L{TunnelAddress}.
- """
- def test_interfaces(self):
- """
- A L{TunnelAddress} instances provides L{IAddress}.
- """
- self.assertTrue(
- verifyObject(IAddress, TunnelAddress(TunnelFlags.IFF_TAP, "tap0")))
- def test_indexing(self):
- """
- A L{TunnelAddress} instance can be indexed to retrieve either the byte
- string C{"TUNTAP"} or the name of the tunnel interface, while
- triggering a deprecation warning.
- """
- address = TunnelAddress(TunnelFlags.IFF_TAP, "tap0")
- self.assertEqual("TUNTAP", address[0])
- self.assertEqual("tap0", address[1])
- warnings = self.flushWarnings([self.test_indexing])
- message = (
- "TunnelAddress.__getitem__ is deprecated since Twisted 14.0.0 "
- "Use attributes instead.")
- self.assertEqual(DeprecationWarning, warnings[0]['category'])
- self.assertEqual(message, warnings[0]['message'])
- self.assertEqual(DeprecationWarning, warnings[1]['category'])
- self.assertEqual(message, warnings[1]['message'])
- self.assertEqual(2, len(warnings))
- def test_repr(self):
- """
- The string representation of a L{TunnelAddress} instance includes the
- class name and the values of the C{type} and C{name} attributes.
- """
- if _PY3:
- self.assertRegex(
- repr(TunnelAddress(TunnelFlags.IFF_TUN, name=b"device")),
- "TunnelAddress type=IFF_TUN name=b'device'>")
- else:
- self.assertEqual(
- "<TunnelAddress type=IFF_TUN name='device'>",
- repr(TunnelAddress(TunnelFlags.IFF_TUN, name=b"device")))
- class TunnelAddressEqualityTests(SynchronousTestCase):
- """
- Tests for the implementation of equality (C{==} and C{!=}) for
- L{TunnelAddress}.
- """
- def setUp(self):
- self.first = TunnelAddress(TunnelFlags.IFF_TUN, b"device")
- # Construct a different object representing IFF_TUN to make this a little
- # trickier. Two FlagConstants from the same container and with the same
- # value do not compare equal to each other.
- #
- # The implementation will have to compare their values directly until
- # https://twistedmatrix.com/trac/ticket/6878 is resolved.
- self.second = TunnelAddress(
- TunnelFlags.IFF_TUN | TunnelFlags.IFF_TUN, b"device")
- self.variedType = TunnelAddress(TunnelFlags.IFF_TAP, b"tap1")
- self.variedName = TunnelAddress(TunnelFlags.IFF_TUN, b"tun1")
- def test_selfComparesEqual(self):
- """
- A L{TunnelAddress} compares equal to itself.
- """
- self.assertTrue(self.first == self.first)
- def test_selfNotComparesNotEqual(self):
- """
- A L{TunnelAddress} doesn't compare not equal to itself.
- """
- self.assertFalse(self.first != self.first)
- def test_sameAttributesComparesEqual(self):
- """
- Two L{TunnelAddress} instances with the same value for the C{type} and
- C{name} attributes compare equal to each other.
- """
- self.assertTrue(self.first == self.second)
- def test_sameAttributesNotComparesNotEqual(self):
- """
- Two L{TunnelAddress} instances with the same value for the C{type} and
- C{name} attributes don't compare not equal to each other.
- """
- self.assertFalse(self.first != self.second)
- def test_differentTypeComparesNotEqual(self):
- """
- Two L{TunnelAddress} instances that differ only by the value of their
- type don't compare equal to each other.
- """
- self.assertFalse(self.first == self.variedType)
- def test_differentTypeNotComparesEqual(self):
- """
- Two L{TunnelAddress} instances that differ only by the value of their
- type compare not equal to each other.
- """
- self.assertTrue(self.first != self.variedType)
- def test_differentNameComparesNotEqual(self):
- """
- Two L{TunnelAddress} instances that differ only by the value of their
- name don't compare equal to each other.
- """
- self.assertFalse(self.first == self.variedName)
- def test_differentNameNotComparesEqual(self):
- """
- Two L{TunnelAddress} instances that differ only by the value of their
- name compare not equal to each other.
- """
- self.assertTrue(self.first != self.variedName)
- def test_differentClassNotComparesEqual(self):
- """
- A L{TunnelAddress} doesn't compare equal to an instance of another
- class.
- """
- self.assertFalse(self.first == self)
- def test_differentClassComparesNotEqual(self):
- """
- A L{TunnelAddress} compares not equal to an instance of another class.
- """
- self.assertTrue(self.first != self)
- @implementer(IRawPacketProtocol)
- class IPRecordingProtocol(AbstractDatagramProtocol):
- """
- A protocol which merely records the datagrams delivered to it.
- """
- def startProtocol(self):
- self.received = []
- def datagramReceived(self, datagram, partial=False):
- self.received.append(datagram)
- class TunTests(TunnelTestsMixin, SynchronousTestCase):
- """
- Tests for L{TuntapPort} when used to open a Linux I{tun} tunnel.
- """
- factory = Factory()
- factory.protocol = IPRecordingProtocol
- helper = TunHelper(None, None)
- class EthernetRecordingProtocol(EthernetProtocol):
- """
- A protocol which merely records the datagrams delivered to it.
- """
- def startProtocol(self):
- self.received = []
- def datagramReceived(self, datagram, partial=False):
- self.received.append(datagram)
- class TapTests(TunnelTestsMixin, SynchronousTestCase):
- """
- Tests for L{TuntapPort} when used to open a Linux I{tap} tunnel.
- """
- factory = Factory()
- factory.protocol = EthernetRecordingProtocol
- helper = TapHelper(None, None, pi=False)
- class IOSystemTestsMixin(object):
- """
- Tests that apply to any L{_IInputOutputSystem} implementation.
- """
- def test_noSuchDevice(self):
- """
- L{_IInputOutputSystem.open} raises L{OSError} when called with a
- non-existent device path.
- """
- system = self.createSystem()
- self.assertRaises(
- OSError,
- system.open, b"/dev/there-is-no-such-device-ever", os.O_RDWR)
- class MemoryIOSystemTests(IOSystemTestsMixin, SynchronousTestCase,
- FakeDeviceTestsMixin):
- """
- General L{_IInputOutputSystem} tests applied to L{MemoryIOSystem}.
- """
- class RealIOSystemTests(IOSystemTestsMixin, SynchronousTestCase,
- RealDeviceTestsMixin):
- """
- General L{_IInputOutputSystem} tests applied to L{_RealSystem}.
- """
|