test_udp.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorUDP} and the UDP parts of
  5. L{IReactorSocket}.
  6. """
  7. from __future__ import division, absolute_import
  8. __metaclass__ = type
  9. import socket
  10. from zope.interface import implementer
  11. from zope.interface.verify import verifyObject
  12. from twisted.python import context
  13. from twisted.python.log import ILogContext, err
  14. from twisted.internet.test.reactormixins import ReactorBuilder
  15. from twisted.internet.defer import Deferred, maybeDeferred
  16. from twisted.internet.interfaces import (
  17. ILoggingContext, IListeningPort, IReactorUDP, IReactorSocket)
  18. from twisted.internet.address import IPv4Address, IPv6Address
  19. from twisted.internet.protocol import DatagramProtocol
  20. from twisted.internet.test.connectionmixins import (LogObserverMixin,
  21. findFreePort)
  22. from twisted.internet import defer, error
  23. from twisted.test.test_udp import Server, GoodClient
  24. from twisted.trial.unittest import SkipTest
  25. def _has_ipv6():
  26. """ Returns True if the system can bind an IPv6 address."""
  27. sock = None
  28. has_ipv6 = False
  29. try:
  30. sock = socket.socket(socket.AF_INET6)
  31. sock.bind(("::1", 0))
  32. has_ipv6 = True
  33. except socket.error:
  34. pass
  35. if sock:
  36. sock.close()
  37. return has_ipv6
  38. HAS_IPV6 = _has_ipv6()
  39. def skipWithoutIPv6(f):
  40. if not HAS_IPV6:
  41. f.skip = "Does not work on systems without IPv6 support."
  42. return f
  43. class DatagramTransportTestsMixin(LogObserverMixin):
  44. """
  45. Mixin defining tests which apply to any port/datagram based transport.
  46. """
  47. def test_startedListeningLogMessage(self):
  48. """
  49. When a port starts, a message including a description of the associated
  50. protocol is logged.
  51. """
  52. loggedMessages = self.observe()
  53. reactor = self.buildReactor()
  54. @implementer(ILoggingContext)
  55. class SomeProtocol(DatagramProtocol):
  56. def logPrefix(self):
  57. return "Crazy Protocol"
  58. protocol = SomeProtocol()
  59. p = self.getListeningPort(reactor, protocol)
  60. expectedMessage = "Crazy Protocol starting on %d" % (p.getHost().port,)
  61. self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
  62. def test_connectionLostLogMessage(self):
  63. """
  64. When a connection is lost a message is logged containing an
  65. address identifying the port and the fact that it was closed.
  66. """
  67. loggedMessages = self.observe()
  68. reactor = self.buildReactor()
  69. p = self.getListeningPort(reactor, DatagramProtocol())
  70. expectedMessage = "(UDP Port %s Closed)" % (p.getHost().port,)
  71. def stopReactor(ignored):
  72. reactor.stop()
  73. def doStopListening():
  74. del loggedMessages[:]
  75. maybeDeferred(p.stopListening).addCallback(stopReactor)
  76. reactor.callWhenRunning(doStopListening)
  77. self.runReactor(reactor)
  78. self.assertEqual((expectedMessage,), loggedMessages[0]['message'])
  79. def test_stopProtocolScheduling(self):
  80. """
  81. L{DatagramProtocol.stopProtocol} is called asynchronously (ie, not
  82. re-entrantly) when C{stopListening} is used to stop the datagram
  83. transport.
  84. """
  85. class DisconnectingProtocol(DatagramProtocol):
  86. started = False
  87. stopped = False
  88. inStartProtocol = False
  89. stoppedInStart = False
  90. def startProtocol(self):
  91. self.started = True
  92. self.inStartProtocol = True
  93. self.transport.stopListening()
  94. self.inStartProtocol = False
  95. def stopProtocol(self):
  96. self.stopped = True
  97. self.stoppedInStart = self.inStartProtocol
  98. reactor.stop()
  99. reactor = self.buildReactor()
  100. protocol = DisconnectingProtocol()
  101. self.getListeningPort(reactor, protocol)
  102. self.runReactor(reactor)
  103. self.assertTrue(protocol.started)
  104. self.assertTrue(protocol.stopped)
  105. self.assertFalse(protocol.stoppedInStart)
  106. class UDPPortTestsMixin(object):
  107. """
  108. Tests for L{IReactorUDP.listenUDP} and
  109. L{IReactorSocket.adoptDatagramPort}.
  110. """
  111. def test_interface(self):
  112. """
  113. L{IReactorUDP.listenUDP} returns an object providing L{IListeningPort}.
  114. """
  115. reactor = self.buildReactor()
  116. port = self.getListeningPort(reactor, DatagramProtocol())
  117. self.assertTrue(verifyObject(IListeningPort, port))
  118. def test_getHost(self):
  119. """
  120. L{IListeningPort.getHost} returns an L{IPv4Address} giving a
  121. dotted-quad of the IPv4 address the port is listening on as well as
  122. the port number.
  123. """
  124. host, portNumber = findFreePort(type=socket.SOCK_DGRAM)
  125. reactor = self.buildReactor()
  126. port = self.getListeningPort(
  127. reactor, DatagramProtocol(), port=portNumber, interface=host)
  128. self.assertEqual(
  129. port.getHost(), IPv4Address('UDP', host, portNumber))
  130. @skipWithoutIPv6
  131. def test_getHostIPv6(self):
  132. """
  133. L{IListeningPort.getHost} returns an L{IPv6Address} when listening on
  134. an IPv6 interface.
  135. """
  136. reactor = self.buildReactor()
  137. port = self.getListeningPort(
  138. reactor, DatagramProtocol(), interface='::1')
  139. addr = port.getHost()
  140. self.assertEqual(addr.host, "::1")
  141. self.assertIsInstance(addr, IPv6Address)
  142. def test_invalidInterface(self):
  143. """
  144. An L{InvalidAddressError} is raised when trying to listen on an address
  145. that isn't a valid IPv4 or IPv6 address.
  146. """
  147. reactor = self.buildReactor()
  148. self.assertRaises(
  149. error.InvalidAddressError, reactor.listenUDP, DatagramProtocol(),
  150. 0, interface='example.com')
  151. def test_logPrefix(self):
  152. """
  153. Datagram transports implement L{ILoggingContext.logPrefix} to return a
  154. message reflecting the protocol they are running.
  155. """
  156. class CustomLogPrefixDatagramProtocol(DatagramProtocol):
  157. def __init__(self, prefix):
  158. self._prefix = prefix
  159. self.system = Deferred()
  160. def logPrefix(self):
  161. return self._prefix
  162. def datagramReceived(self, bytes, addr):
  163. if self.system is not None:
  164. system = self.system
  165. self.system = None
  166. system.callback(context.get(ILogContext)["system"])
  167. reactor = self.buildReactor()
  168. protocol = CustomLogPrefixDatagramProtocol("Custom Datagrams")
  169. d = protocol.system
  170. port = self.getListeningPort(reactor, protocol)
  171. address = port.getHost()
  172. def gotSystem(system):
  173. self.assertEqual("Custom Datagrams (UDP)", system)
  174. d.addCallback(gotSystem)
  175. d.addErrback(err)
  176. d.addCallback(lambda ignored: reactor.stop())
  177. port.write(b"some bytes", ('127.0.0.1', address.port))
  178. self.runReactor(reactor)
  179. def test_writeSequence(self):
  180. """
  181. Write a sequence of L{bytes} to a L{DatagramProtocol}.
  182. """
  183. class SimpleDatagramProtocol(DatagramProtocol):
  184. def __init__(self):
  185. self.defer = Deferred()
  186. def datagramReceived(self, data, addr):
  187. self.defer.callback(data)
  188. reactor = self.buildReactor()
  189. protocol = SimpleDatagramProtocol()
  190. defer = protocol.defer
  191. port = self.getListeningPort(reactor, protocol)
  192. address = port.getHost()
  193. dataToWrite = (b"some", b"bytes", b"to", b"write")
  194. def gotData(data):
  195. self.assertEqual(b"".join(dataToWrite), data)
  196. defer.addCallback(gotData)
  197. defer.addErrback(err)
  198. defer.addCallback(lambda ignored: reactor.stop())
  199. port.writeSequence(dataToWrite, ('127.0.0.1', address.port))
  200. self.runReactor(reactor)
  201. def test_str(self):
  202. """
  203. C{str()} on the listening port object includes the port number.
  204. """
  205. reactor = self.buildReactor()
  206. port = self.getListeningPort(reactor, DatagramProtocol())
  207. self.assertIn(str(port.getHost().port), str(port))
  208. def test_repr(self):
  209. """
  210. C{repr()} on the listening port object includes the port number.
  211. """
  212. reactor = self.buildReactor()
  213. port = self.getListeningPort(reactor, DatagramProtocol())
  214. self.assertIn(repr(port.getHost().port), str(port))
  215. @skipWithoutIPv6
  216. def test_writeToIPv6Interface(self):
  217. """
  218. Writing to an IPv6 UDP socket on the loopback interface succeeds.
  219. """
  220. reactor = self.buildReactor()
  221. server = Server()
  222. serverStarted = server.startedDeferred = defer.Deferred()
  223. self.getListeningPort(reactor, server, interface="::1")
  224. client = GoodClient()
  225. clientStarted = client.startedDeferred = defer.Deferred()
  226. self.getListeningPort(reactor, client, interface="::1")
  227. cAddr = client.transport.getHost()
  228. def cbClientStarted(ignored):
  229. """
  230. Send a datagram from the client once it's started.
  231. @param ignored: a list of C{[None, None]}, which is ignored
  232. @returns: a deferred which fires when the server has received a
  233. datagram.
  234. """
  235. client.transport.write(
  236. b"spam", ("::1", server.transport.getHost().port))
  237. serverReceived = server.packetReceived = defer.Deferred()
  238. return serverReceived
  239. def cbServerReceived(ignored):
  240. """
  241. Stop the reactor after a datagram is received.
  242. @param ignored: L{None}, which is ignored
  243. @returns: L{None}
  244. """
  245. reactor.stop()
  246. d = defer.gatherResults([serverStarted, clientStarted])
  247. d.addCallback(cbClientStarted)
  248. d.addCallback(cbServerReceived)
  249. d.addErrback(err)
  250. self.runReactor(reactor)
  251. packet = server.packets[0]
  252. self.assertEqual(packet, (b'spam', (cAddr.host, cAddr.port)))
  253. @skipWithoutIPv6
  254. def test_connectedWriteToIPv6Interface(self):
  255. """
  256. An IPv6 address can be passed as the C{interface} argument to
  257. L{listenUDP}. The resulting Port accepts IPv6 datagrams.
  258. """
  259. reactor = self.buildReactor()
  260. server = Server()
  261. serverStarted = server.startedDeferred = defer.Deferred()
  262. self.getListeningPort(reactor, server, interface="::1")
  263. client = GoodClient()
  264. clientStarted = client.startedDeferred = defer.Deferred()
  265. self.getListeningPort(reactor, client, interface="::1")
  266. cAddr = client.transport.getHost()
  267. def cbClientStarted(ignored):
  268. """
  269. Send a datagram from the client once it's started.
  270. @param ignored: a list of C{[None, None]}, which is ignored
  271. @returns: a deferred which fires when the server has received a
  272. datagram.
  273. """
  274. client.transport.connect("::1", server.transport.getHost().port)
  275. client.transport.write(b"spam")
  276. serverReceived = server.packetReceived = defer.Deferred()
  277. return serverReceived
  278. def cbServerReceived(ignored):
  279. """
  280. Stop the reactor after a datagram is received.
  281. @param ignored: L{None}, which is ignored
  282. @returns: L{None}
  283. """
  284. reactor.stop()
  285. d = defer.gatherResults([serverStarted, clientStarted])
  286. d.addCallback(cbClientStarted)
  287. d.addCallback(cbServerReceived)
  288. d.addErrback(err)
  289. self.runReactor(reactor)
  290. packet = server.packets[0]
  291. self.assertEqual(packet, (b'spam', (cAddr.host, cAddr.port)))
  292. def test_writingToHostnameRaisesInvalidAddressError(self):
  293. """
  294. Writing to a hostname instead of an IP address will raise an
  295. L{InvalidAddressError}.
  296. """
  297. reactor = self.buildReactor()
  298. port = self.getListeningPort(reactor, DatagramProtocol())
  299. self.assertRaises(
  300. error.InvalidAddressError,
  301. port.write, 'spam', ('example.invalid', 1))
  302. @skipWithoutIPv6
  303. def test_writingToIPv6OnIPv4RaisesInvalidAddressError(self):
  304. """
  305. Writing to an IPv6 address on an IPv4 socket will raise an
  306. L{InvalidAddressError}.
  307. """
  308. reactor = self.buildReactor()
  309. port = self.getListeningPort(
  310. reactor, DatagramProtocol(), interface="127.0.0.1")
  311. self.assertRaises(
  312. error.InvalidAddressError, port.write, 'spam', ('::1', 1))
  313. @skipWithoutIPv6
  314. def test_writingToIPv4OnIPv6RaisesInvalidAddressError(self):
  315. """
  316. Writing to an IPv6 address on an IPv4 socket will raise an
  317. L{InvalidAddressError}.
  318. """
  319. reactor = self.buildReactor()
  320. port = self.getListeningPort(
  321. reactor, DatagramProtocol(), interface="::1")
  322. self.assertRaises(
  323. error.InvalidAddressError, port.write, 'spam', ('127.0.0.1', 1))
  324. def test_connectingToHostnameRaisesInvalidAddressError(self):
  325. """
  326. Connecting to a hostname instead of an IP address will raise an
  327. L{InvalidAddressError}.
  328. """
  329. reactor = self.buildReactor()
  330. port = self.getListeningPort(reactor, DatagramProtocol())
  331. self.assertRaises(
  332. error.InvalidAddressError, port.connect, 'example.invalid', 1)
  333. def test_allowBroadcast(self):
  334. """
  335. L{IListeningPort.setBroadcastAllowed} sets broadcast to be allowed
  336. on the socket.
  337. """
  338. reactor = self.buildReactor()
  339. port = self.getListeningPort(reactor, DatagramProtocol())
  340. port.setBroadcastAllowed(True)
  341. self.assertTrue(port.getBroadcastAllowed())
  342. class UDPServerTestsBuilder(ReactorBuilder,
  343. UDPPortTestsMixin, DatagramTransportTestsMixin):
  344. """
  345. Run L{UDPPortTestsMixin} tests using newly created UDP
  346. sockets.
  347. """
  348. requiredInterfaces = (IReactorUDP,)
  349. def getListeningPort(self, reactor, protocol, port=0, interface='',
  350. maxPacketSize=8192):
  351. """
  352. Get a UDP port from a reactor.
  353. @param reactor: A reactor used to build the returned
  354. L{IListeningPort} provider.
  355. @type reactor: L{twisted.internet.interfaces.IReactorUDP}
  356. @see: L{twisted.internet.IReactorUDP.listenUDP} for other
  357. argument and return types.
  358. """
  359. return reactor.listenUDP(port, protocol, interface=interface,
  360. maxPacketSize=maxPacketSize)
  361. class UDPFDServerTestsBuilder(ReactorBuilder,
  362. UDPPortTestsMixin, DatagramTransportTestsMixin):
  363. """
  364. Run L{UDPPortTestsMixin} tests using adopted UDP sockets.
  365. """
  366. requiredInterfaces = (IReactorSocket,)
  367. def getListeningPort(self, reactor, protocol, port=0, interface='',
  368. maxPacketSize=8192):
  369. """
  370. Get a UDP port from a reactor, wrapping an already-initialized file
  371. descriptor.
  372. @param reactor: A reactor used to build the returned
  373. L{IListeningPort} provider.
  374. @type reactor: L{twisted.internet.interfaces.IReactorSocket}
  375. @param port: A port number to which the adopted socket will be
  376. bound.
  377. @type port: C{int}
  378. @param interface: The local IPv4 or IPv6 address to which the
  379. adopted socket will be bound. defaults to '', ie all IPv4
  380. addresses.
  381. @type interface: C{str}
  382. @see: L{twisted.internet.IReactorSocket.adoptDatagramPort} for other
  383. argument and return types.
  384. """
  385. if IReactorSocket.providedBy(reactor):
  386. if ':' in interface:
  387. domain = socket.AF_INET6
  388. address = socket.getaddrinfo(interface, port)[0][4]
  389. else:
  390. domain = socket.AF_INET
  391. address = (interface, port)
  392. portSock = socket.socket(domain, socket.SOCK_DGRAM)
  393. portSock.bind(address)
  394. portSock.setblocking(False)
  395. try:
  396. return reactor.adoptDatagramPort(
  397. portSock.fileno(), portSock.family, protocol,
  398. maxPacketSize)
  399. finally:
  400. # The socket should still be open; fileno will raise if it is
  401. # not.
  402. portSock.fileno()
  403. # Now clean it up, because the rest of the test does not need
  404. # it.
  405. portSock.close()
  406. else:
  407. raise SkipTest("Reactor does not provide IReactorSocket")
  408. globals().update(UDPServerTestsBuilder.makeTestCaseClasses())
  409. globals().update(UDPFDServerTestsBuilder.makeTestCaseClasses())