test_fdset.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorFDSet}.
  5. """
  6. __metaclass__ = type
  7. import os, socket, traceback
  8. from zope.interface import implementer
  9. from twisted.python.runtime import platform
  10. from twisted.trial.unittest import SkipTest
  11. from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor
  12. from twisted.internet.abstract import FileDescriptor
  13. from twisted.internet.test.reactormixins import ReactorBuilder
  14. # twisted.internet.tcp nicely defines some names with proper values on
  15. # several different platforms.
  16. from twisted.internet.tcp import EINPROGRESS, EWOULDBLOCK
  17. def socketpair():
  18. serverSocket = socket.socket()
  19. serverSocket.bind(('127.0.0.1', 0))
  20. serverSocket.listen(1)
  21. try:
  22. client = socket.socket()
  23. try:
  24. client.setblocking(False)
  25. try:
  26. client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
  27. except socket.error as e:
  28. if e.args[0] not in (EINPROGRESS, EWOULDBLOCK):
  29. raise
  30. server, addr = serverSocket.accept()
  31. except:
  32. client.close()
  33. raise
  34. finally:
  35. serverSocket.close()
  36. return client, server
  37. class ReactorFDSetTestsBuilder(ReactorBuilder):
  38. """
  39. Builder defining tests relating to L{IReactorFDSet}.
  40. """
  41. requiredInterfaces = [IReactorFDSet]
  42. def _connectedPair(self):
  43. """
  44. Return the two sockets which make up a new TCP connection.
  45. """
  46. client, server = socketpair()
  47. self.addCleanup(client.close)
  48. self.addCleanup(server.close)
  49. return client, server
  50. def _simpleSetup(self):
  51. reactor = self.buildReactor()
  52. client, server = self._connectedPair()
  53. fd = FileDescriptor(reactor)
  54. fd.fileno = client.fileno
  55. return reactor, fd, server
  56. def test_addReader(self):
  57. """
  58. C{reactor.addReader()} accepts an L{IReadDescriptor} provider and calls
  59. its C{doRead} method when there may be data available on its C{fileno}.
  60. """
  61. reactor, fd, server = self._simpleSetup()
  62. def removeAndStop():
  63. reactor.removeReader(fd)
  64. reactor.stop()
  65. fd.doRead = removeAndStop
  66. reactor.addReader(fd)
  67. server.sendall(b'x')
  68. # The reactor will only stop if it calls fd.doRead.
  69. self.runReactor(reactor)
  70. # Nothing to assert, just be glad we got this far.
  71. def test_removeReader(self):
  72. """
  73. L{reactor.removeReader()} accepts an L{IReadDescriptor} provider
  74. previously passed to C{reactor.addReader()} and causes it to no longer
  75. be monitored for input events.
  76. """
  77. reactor, fd, server = self._simpleSetup()
  78. def fail():
  79. self.fail("doRead should not be called")
  80. fd.doRead = fail
  81. reactor.addReader(fd)
  82. reactor.removeReader(fd)
  83. server.sendall(b'x')
  84. # Give the reactor two timed event passes to notice that there's I/O
  85. # (if it is incorrectly watching for I/O).
  86. reactor.callLater(0, reactor.callLater, 0, reactor.stop)
  87. self.runReactor(reactor)
  88. # Getting here means the right thing happened probably.
  89. def test_addWriter(self):
  90. """
  91. C{reactor.addWriter()} accepts an L{IWriteDescriptor} provider and
  92. calls its C{doWrite} method when it may be possible to write to its
  93. C{fileno}.
  94. """
  95. reactor, fd, server = self._simpleSetup()
  96. def removeAndStop():
  97. reactor.removeWriter(fd)
  98. reactor.stop()
  99. fd.doWrite = removeAndStop
  100. reactor.addWriter(fd)
  101. self.runReactor(reactor)
  102. # Getting here is great.
  103. def _getFDTest(self, kind):
  104. """
  105. Helper for getReaders and getWriters tests.
  106. """
  107. reactor = self.buildReactor()
  108. get = getattr(reactor, 'get' + kind + 's')
  109. add = getattr(reactor, 'add' + kind)
  110. remove = getattr(reactor, 'remove' + kind)
  111. client, server = self._connectedPair()
  112. self.assertNotIn(client, get())
  113. self.assertNotIn(server, get())
  114. add(client)
  115. self.assertIn(client, get())
  116. self.assertNotIn(server, get())
  117. remove(client)
  118. self.assertNotIn(client, get())
  119. self.assertNotIn(server, get())
  120. def test_getReaders(self):
  121. """
  122. L{IReactorFDSet.getReaders} reflects the additions and removals made
  123. with L{IReactorFDSet.addReader} and L{IReactorFDSet.removeReader}.
  124. """
  125. self._getFDTest('Reader')
  126. def test_removeWriter(self):
  127. """
  128. L{reactor.removeWriter()} accepts an L{IWriteDescriptor} provider
  129. previously passed to C{reactor.addWriter()} and causes it to no longer
  130. be monitored for outputability.
  131. """
  132. reactor, fd, server = self._simpleSetup()
  133. def fail():
  134. self.fail("doWrite should not be called")
  135. fd.doWrite = fail
  136. reactor.addWriter(fd)
  137. reactor.removeWriter(fd)
  138. # Give the reactor two timed event passes to notice that there's I/O
  139. # (if it is incorrectly watching for I/O).
  140. reactor.callLater(0, reactor.callLater, 0, reactor.stop)
  141. self.runReactor(reactor)
  142. # Getting here means the right thing happened probably.
  143. def test_getWriters(self):
  144. """
  145. L{IReactorFDSet.getWriters} reflects the additions and removals made
  146. with L{IReactorFDSet.addWriter} and L{IReactorFDSet.removeWriter}.
  147. """
  148. self._getFDTest('Writer')
  149. def test_removeAll(self):
  150. """
  151. C{reactor.removeAll()} removes all registered L{IReadDescriptor}
  152. providers and all registered L{IWriteDescriptor} providers and returns
  153. them.
  154. """
  155. reactor = self.buildReactor()
  156. reactor, fd, server = self._simpleSetup()
  157. fd.doRead = lambda: self.fail("doRead should not be called")
  158. fd.doWrite = lambda: self.fail("doWrite should not be called")
  159. server.sendall(b'x')
  160. reactor.addReader(fd)
  161. reactor.addWriter(fd)
  162. removed = reactor.removeAll()
  163. # Give the reactor two timed event passes to notice that there's I/O
  164. # (if it is incorrectly watching for I/O).
  165. reactor.callLater(0, reactor.callLater, 0, reactor.stop)
  166. self.runReactor(reactor)
  167. # Getting here means the right thing happened probably.
  168. self.assertEqual(removed, [fd])
  169. def test_removedFromReactor(self):
  170. """
  171. A descriptor's C{fileno} method should not be called after the
  172. descriptor has been removed from the reactor.
  173. """
  174. reactor = self.buildReactor()
  175. descriptor = RemovingDescriptor(reactor)
  176. reactor.callWhenRunning(descriptor.start)
  177. self.runReactor(reactor)
  178. self.assertEqual(descriptor.calls, [])
  179. def test_negativeOneFileDescriptor(self):
  180. """
  181. If L{FileDescriptor.fileno} returns C{-1}, the descriptor is removed
  182. from the reactor.
  183. """
  184. reactor = self.buildReactor()
  185. client, server = self._connectedPair()
  186. class DisappearingDescriptor(FileDescriptor):
  187. _fileno = server.fileno()
  188. _received = b""
  189. def fileno(self):
  190. return self._fileno
  191. def doRead(self):
  192. self._fileno = -1
  193. self._received += server.recv(1)
  194. client.send(b'y')
  195. def connectionLost(self, reason):
  196. reactor.stop()
  197. descriptor = DisappearingDescriptor(reactor)
  198. reactor.addReader(descriptor)
  199. client.send(b'x')
  200. self.runReactor(reactor)
  201. self.assertEqual(descriptor._received, b"x")
  202. def test_lostFileDescriptor(self):
  203. """
  204. The file descriptor underlying a FileDescriptor may be closed and
  205. replaced by another at some point. Bytes which arrive on the new
  206. descriptor must not be delivered to the FileDescriptor which was
  207. originally registered with the original descriptor of the same number.
  208. Practically speaking, this is difficult or impossible to detect. The
  209. implementation relies on C{fileno} raising an exception if the original
  210. descriptor has gone away. If C{fileno} continues to return the original
  211. file descriptor value, the reactor may deliver events from that
  212. descriptor. This is a best effort attempt to ease certain debugging
  213. situations. Applications should not rely on it intentionally.
  214. """
  215. reactor = self.buildReactor()
  216. name = reactor.__class__.__name__
  217. if name in ('EPollReactor', 'KQueueReactor', 'CFReactor',
  218. 'AsyncioSelectorReactor'):
  219. # Closing a file descriptor immediately removes it from the epoll
  220. # set without generating a notification. That means epollreactor
  221. # will not call any methods on Victim after the close, so there's
  222. # no chance to notice the socket is no longer valid.
  223. raise SkipTest("%r cannot detect lost file descriptors" % (name,))
  224. client, server = self._connectedPair()
  225. class Victim(FileDescriptor):
  226. """
  227. This L{FileDescriptor} will have its socket closed out from under it
  228. and another socket will take its place. It will raise a
  229. socket.error from C{fileno} after this happens (because socket
  230. objects remember whether they have been closed), so as long as the
  231. reactor calls the C{fileno} method the problem will be detected.
  232. """
  233. def fileno(self):
  234. return server.fileno()
  235. def doRead(self):
  236. raise Exception("Victim.doRead should never be called")
  237. def connectionLost(self, reason):
  238. """
  239. When the problem is detected, the reactor should disconnect this
  240. file descriptor. When that happens, stop the reactor so the
  241. test ends.
  242. """
  243. reactor.stop()
  244. reactor.addReader(Victim())
  245. # Arrange for the socket to be replaced at some unspecified time.
  246. # Significantly, this will not be while any I/O processing code is on
  247. # the stack. It is something that happens independently and cannot be
  248. # relied upon to happen at a convenient time, such as within a call to
  249. # doRead.
  250. def messItUp():
  251. newC, newS = self._connectedPair()
  252. fileno = server.fileno()
  253. server.close()
  254. os.dup2(newS.fileno(), fileno)
  255. newC.send(b"x")
  256. reactor.callLater(0, messItUp)
  257. self.runReactor(reactor)
  258. # If the implementation feels like logging the exception raised by
  259. # MessedUp.fileno, that's fine.
  260. self.flushLoggedErrors(socket.error)
  261. if platform.isWindows():
  262. test_lostFileDescriptor.skip = (
  263. "Cannot duplicate socket filenos on Windows")
  264. def test_connectionLostOnShutdown(self):
  265. """
  266. Any file descriptors added to the reactor have their C{connectionLost}
  267. called when C{reactor.stop} is called.
  268. """
  269. reactor = self.buildReactor()
  270. class DoNothingDescriptor(FileDescriptor):
  271. def doRead(self):
  272. return None
  273. def doWrite(self):
  274. return None
  275. client, server = self._connectedPair()
  276. fd1 = DoNothingDescriptor(reactor)
  277. fd1.fileno = client.fileno
  278. fd2 = DoNothingDescriptor(reactor)
  279. fd2.fileno = server.fileno
  280. reactor.addReader(fd1)
  281. reactor.addWriter(fd2)
  282. reactor.callWhenRunning(reactor.stop)
  283. self.runReactor(reactor)
  284. self.assertTrue(fd1.disconnected)
  285. self.assertTrue(fd2.disconnected)
  286. @implementer(IReadDescriptor)
  287. class RemovingDescriptor(object):
  288. """
  289. A read descriptor which removes itself from the reactor as soon as it
  290. gets a chance to do a read and keeps track of when its own C{fileno}
  291. method is called.
  292. @ivar insideReactor: A flag which is true as long as the reactor has
  293. this descriptor as a reader.
  294. @ivar calls: A list of the bottom of the call stack for any call to
  295. C{fileno} when C{insideReactor} is false.
  296. """
  297. def __init__(self, reactor):
  298. self.reactor = reactor
  299. self.insideReactor = False
  300. self.calls = []
  301. self.read, self.write = socketpair()
  302. def start(self):
  303. self.insideReactor = True
  304. self.reactor.addReader(self)
  305. self.write.send(b'a')
  306. def logPrefix(self):
  307. return 'foo'
  308. def doRead(self):
  309. self.reactor.removeReader(self)
  310. self.insideReactor = False
  311. self.reactor.stop()
  312. self.read.close()
  313. self.write.close()
  314. def fileno(self):
  315. if not self.insideReactor:
  316. self.calls.append(traceback.extract_stack(limit=5)[:-1])
  317. return self.read.fileno()
  318. def connectionLost(self, reason):
  319. # Ideally we'd close the descriptors here... but actually
  320. # connectionLost is never called because we remove ourselves from the
  321. # reactor before it stops.
  322. pass
  323. globals().update(ReactorFDSetTestsBuilder.makeTestCaseClasses())