123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for implementations of L{IReactorFDSet}.
- """
- __metaclass__ = type
- import os, socket, traceback
- from zope.interface import implementer
- from twisted.python.runtime import platform
- from twisted.trial.unittest import SkipTest
- from twisted.internet.interfaces import IReactorFDSet, IReadDescriptor
- from twisted.internet.abstract import FileDescriptor
- from twisted.internet.test.reactormixins import ReactorBuilder
- # twisted.internet.tcp nicely defines some names with proper values on
- # several different platforms.
- from twisted.internet.tcp import EINPROGRESS, EWOULDBLOCK
- def socketpair():
- serverSocket = socket.socket()
- serverSocket.bind(('127.0.0.1', 0))
- serverSocket.listen(1)
- try:
- client = socket.socket()
- try:
- client.setblocking(False)
- try:
- client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
- except socket.error as e:
- if e.args[0] not in (EINPROGRESS, EWOULDBLOCK):
- raise
- server, addr = serverSocket.accept()
- except:
- client.close()
- raise
- finally:
- serverSocket.close()
- return client, server
- class ReactorFDSetTestsBuilder(ReactorBuilder):
- """
- Builder defining tests relating to L{IReactorFDSet}.
- """
- requiredInterfaces = [IReactorFDSet]
- def _connectedPair(self):
- """
- Return the two sockets which make up a new TCP connection.
- """
- client, server = socketpair()
- self.addCleanup(client.close)
- self.addCleanup(server.close)
- return client, server
- def _simpleSetup(self):
- reactor = self.buildReactor()
- client, server = self._connectedPair()
- fd = FileDescriptor(reactor)
- fd.fileno = client.fileno
- return reactor, fd, server
- def test_addReader(self):
- """
- C{reactor.addReader()} accepts an L{IReadDescriptor} provider and calls
- its C{doRead} method when there may be data available on its C{fileno}.
- """
- reactor, fd, server = self._simpleSetup()
- def removeAndStop():
- reactor.removeReader(fd)
- reactor.stop()
- fd.doRead = removeAndStop
- reactor.addReader(fd)
- server.sendall(b'x')
- # The reactor will only stop if it calls fd.doRead.
- self.runReactor(reactor)
- # Nothing to assert, just be glad we got this far.
- def test_removeReader(self):
- """
- L{reactor.removeReader()} accepts an L{IReadDescriptor} provider
- previously passed to C{reactor.addReader()} and causes it to no longer
- be monitored for input events.
- """
- reactor, fd, server = self._simpleSetup()
- def fail():
- self.fail("doRead should not be called")
- fd.doRead = fail
- reactor.addReader(fd)
- reactor.removeReader(fd)
- server.sendall(b'x')
- # Give the reactor two timed event passes to notice that there's I/O
- # (if it is incorrectly watching for I/O).
- reactor.callLater(0, reactor.callLater, 0, reactor.stop)
- self.runReactor(reactor)
- # Getting here means the right thing happened probably.
- def test_addWriter(self):
- """
- C{reactor.addWriter()} accepts an L{IWriteDescriptor} provider and
- calls its C{doWrite} method when it may be possible to write to its
- C{fileno}.
- """
- reactor, fd, server = self._simpleSetup()
- def removeAndStop():
- reactor.removeWriter(fd)
- reactor.stop()
- fd.doWrite = removeAndStop
- reactor.addWriter(fd)
- self.runReactor(reactor)
- # Getting here is great.
- def _getFDTest(self, kind):
- """
- Helper for getReaders and getWriters tests.
- """
- reactor = self.buildReactor()
- get = getattr(reactor, 'get' + kind + 's')
- add = getattr(reactor, 'add' + kind)
- remove = getattr(reactor, 'remove' + kind)
- client, server = self._connectedPair()
- self.assertNotIn(client, get())
- self.assertNotIn(server, get())
- add(client)
- self.assertIn(client, get())
- self.assertNotIn(server, get())
- remove(client)
- self.assertNotIn(client, get())
- self.assertNotIn(server, get())
- def test_getReaders(self):
- """
- L{IReactorFDSet.getReaders} reflects the additions and removals made
- with L{IReactorFDSet.addReader} and L{IReactorFDSet.removeReader}.
- """
- self._getFDTest('Reader')
- def test_removeWriter(self):
- """
- L{reactor.removeWriter()} accepts an L{IWriteDescriptor} provider
- previously passed to C{reactor.addWriter()} and causes it to no longer
- be monitored for outputability.
- """
- reactor, fd, server = self._simpleSetup()
- def fail():
- self.fail("doWrite should not be called")
- fd.doWrite = fail
- reactor.addWriter(fd)
- reactor.removeWriter(fd)
- # Give the reactor two timed event passes to notice that there's I/O
- # (if it is incorrectly watching for I/O).
- reactor.callLater(0, reactor.callLater, 0, reactor.stop)
- self.runReactor(reactor)
- # Getting here means the right thing happened probably.
- def test_getWriters(self):
- """
- L{IReactorFDSet.getWriters} reflects the additions and removals made
- with L{IReactorFDSet.addWriter} and L{IReactorFDSet.removeWriter}.
- """
- self._getFDTest('Writer')
- def test_removeAll(self):
- """
- C{reactor.removeAll()} removes all registered L{IReadDescriptor}
- providers and all registered L{IWriteDescriptor} providers and returns
- them.
- """
- reactor = self.buildReactor()
- reactor, fd, server = self._simpleSetup()
- fd.doRead = lambda: self.fail("doRead should not be called")
- fd.doWrite = lambda: self.fail("doWrite should not be called")
- server.sendall(b'x')
- reactor.addReader(fd)
- reactor.addWriter(fd)
- removed = reactor.removeAll()
- # Give the reactor two timed event passes to notice that there's I/O
- # (if it is incorrectly watching for I/O).
- reactor.callLater(0, reactor.callLater, 0, reactor.stop)
- self.runReactor(reactor)
- # Getting here means the right thing happened probably.
- self.assertEqual(removed, [fd])
- def test_removedFromReactor(self):
- """
- A descriptor's C{fileno} method should not be called after the
- descriptor has been removed from the reactor.
- """
- reactor = self.buildReactor()
- descriptor = RemovingDescriptor(reactor)
- reactor.callWhenRunning(descriptor.start)
- self.runReactor(reactor)
- self.assertEqual(descriptor.calls, [])
- def test_negativeOneFileDescriptor(self):
- """
- If L{FileDescriptor.fileno} returns C{-1}, the descriptor is removed
- from the reactor.
- """
- reactor = self.buildReactor()
- client, server = self._connectedPair()
- class DisappearingDescriptor(FileDescriptor):
- _fileno = server.fileno()
- _received = b""
- def fileno(self):
- return self._fileno
- def doRead(self):
- self._fileno = -1
- self._received += server.recv(1)
- client.send(b'y')
- def connectionLost(self, reason):
- reactor.stop()
- descriptor = DisappearingDescriptor(reactor)
- reactor.addReader(descriptor)
- client.send(b'x')
- self.runReactor(reactor)
- self.assertEqual(descriptor._received, b"x")
- def test_lostFileDescriptor(self):
- """
- The file descriptor underlying a FileDescriptor may be closed and
- replaced by another at some point. Bytes which arrive on the new
- descriptor must not be delivered to the FileDescriptor which was
- originally registered with the original descriptor of the same number.
- Practically speaking, this is difficult or impossible to detect. The
- implementation relies on C{fileno} raising an exception if the original
- descriptor has gone away. If C{fileno} continues to return the original
- file descriptor value, the reactor may deliver events from that
- descriptor. This is a best effort attempt to ease certain debugging
- situations. Applications should not rely on it intentionally.
- """
- reactor = self.buildReactor()
- name = reactor.__class__.__name__
- if name in ('EPollReactor', 'KQueueReactor', 'CFReactor',
- 'AsyncioSelectorReactor'):
- # Closing a file descriptor immediately removes it from the epoll
- # set without generating a notification. That means epollreactor
- # will not call any methods on Victim after the close, so there's
- # no chance to notice the socket is no longer valid.
- raise SkipTest("%r cannot detect lost file descriptors" % (name,))
- client, server = self._connectedPair()
- class Victim(FileDescriptor):
- """
- This L{FileDescriptor} will have its socket closed out from under it
- and another socket will take its place. It will raise a
- socket.error from C{fileno} after this happens (because socket
- objects remember whether they have been closed), so as long as the
- reactor calls the C{fileno} method the problem will be detected.
- """
- def fileno(self):
- return server.fileno()
- def doRead(self):
- raise Exception("Victim.doRead should never be called")
- def connectionLost(self, reason):
- """
- When the problem is detected, the reactor should disconnect this
- file descriptor. When that happens, stop the reactor so the
- test ends.
- """
- reactor.stop()
- reactor.addReader(Victim())
- # Arrange for the socket to be replaced at some unspecified time.
- # Significantly, this will not be while any I/O processing code is on
- # the stack. It is something that happens independently and cannot be
- # relied upon to happen at a convenient time, such as within a call to
- # doRead.
- def messItUp():
- newC, newS = self._connectedPair()
- fileno = server.fileno()
- server.close()
- os.dup2(newS.fileno(), fileno)
- newC.send(b"x")
- reactor.callLater(0, messItUp)
- self.runReactor(reactor)
- # If the implementation feels like logging the exception raised by
- # MessedUp.fileno, that's fine.
- self.flushLoggedErrors(socket.error)
- if platform.isWindows():
- test_lostFileDescriptor.skip = (
- "Cannot duplicate socket filenos on Windows")
- def test_connectionLostOnShutdown(self):
- """
- Any file descriptors added to the reactor have their C{connectionLost}
- called when C{reactor.stop} is called.
- """
- reactor = self.buildReactor()
- class DoNothingDescriptor(FileDescriptor):
- def doRead(self):
- return None
- def doWrite(self):
- return None
- client, server = self._connectedPair()
- fd1 = DoNothingDescriptor(reactor)
- fd1.fileno = client.fileno
- fd2 = DoNothingDescriptor(reactor)
- fd2.fileno = server.fileno
- reactor.addReader(fd1)
- reactor.addWriter(fd2)
- reactor.callWhenRunning(reactor.stop)
- self.runReactor(reactor)
- self.assertTrue(fd1.disconnected)
- self.assertTrue(fd2.disconnected)
- @implementer(IReadDescriptor)
- class RemovingDescriptor(object):
- """
- A read descriptor which removes itself from the reactor as soon as it
- gets a chance to do a read and keeps track of when its own C{fileno}
- method is called.
- @ivar insideReactor: A flag which is true as long as the reactor has
- this descriptor as a reader.
- @ivar calls: A list of the bottom of the call stack for any call to
- C{fileno} when C{insideReactor} is false.
- """
- def __init__(self, reactor):
- self.reactor = reactor
- self.insideReactor = False
- self.calls = []
- self.read, self.write = socketpair()
- def start(self):
- self.insideReactor = True
- self.reactor.addReader(self)
- self.write.send(b'a')
- def logPrefix(self):
- return 'foo'
- def doRead(self):
- self.reactor.removeReader(self)
- self.insideReactor = False
- self.reactor.stop()
- self.read.close()
- self.write.close()
- def fileno(self):
- if not self.insideReactor:
- self.calls.append(traceback.extract_stack(limit=5)[:-1])
- return self.read.fileno()
- def connectionLost(self, reason):
- # Ideally we'd close the descriptors here... but actually
- # connectionLost is never called because we remove ourselves from the
- # reactor before it stops.
- pass
- globals().update(ReactorFDSetTestsBuilder.makeTestCaseClasses())
|