test_epollreactor.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.internet.epollreactor}.
  5. """
  6. from __future__ import division, absolute_import
  7. from twisted.trial.unittest import TestCase
  8. try:
  9. from twisted.internet.epollreactor import _ContinuousPolling
  10. except ImportError:
  11. _ContinuousPolling = None
  12. from twisted.internet.task import Clock
  13. from twisted.internet.error import ConnectionDone
  14. class Descriptor(object):
  15. """
  16. Records reads and writes, as if it were a C{FileDescriptor}.
  17. """
  18. def __init__(self):
  19. self.events = []
  20. def fileno(self):
  21. return 1
  22. def doRead(self):
  23. self.events.append("read")
  24. def doWrite(self):
  25. self.events.append("write")
  26. def connectionLost(self, reason):
  27. reason.trap(ConnectionDone)
  28. self.events.append("lost")
  29. class ContinuousPollingTests(TestCase):
  30. """
  31. L{_ContinuousPolling} can be used to read and write from C{FileDescriptor}
  32. objects.
  33. """
  34. def test_addReader(self):
  35. """
  36. Adding a reader when there was previously no reader starts up a
  37. C{LoopingCall}.
  38. """
  39. poller = _ContinuousPolling(Clock())
  40. self.assertIsNone(poller._loop)
  41. reader = object()
  42. self.assertFalse(poller.isReading(reader))
  43. poller.addReader(reader)
  44. self.assertIsNotNone(poller._loop)
  45. self.assertTrue(poller._loop.running)
  46. self.assertIs(poller._loop.clock, poller._reactor)
  47. self.assertTrue(poller.isReading(reader))
  48. def test_addWriter(self):
  49. """
  50. Adding a writer when there was previously no writer starts up a
  51. C{LoopingCall}.
  52. """
  53. poller = _ContinuousPolling(Clock())
  54. self.assertIsNone(poller._loop)
  55. writer = object()
  56. self.assertFalse(poller.isWriting(writer))
  57. poller.addWriter(writer)
  58. self.assertIsNotNone(poller._loop)
  59. self.assertTrue(poller._loop.running)
  60. self.assertIs(poller._loop.clock, poller._reactor)
  61. self.assertTrue(poller.isWriting(writer))
  62. def test_removeReader(self):
  63. """
  64. Removing a reader stops the C{LoopingCall}.
  65. """
  66. poller = _ContinuousPolling(Clock())
  67. reader = object()
  68. poller.addReader(reader)
  69. poller.removeReader(reader)
  70. self.assertIsNone(poller._loop)
  71. self.assertEqual(poller._reactor.getDelayedCalls(), [])
  72. self.assertFalse(poller.isReading(reader))
  73. def test_removeWriter(self):
  74. """
  75. Removing a writer stops the C{LoopingCall}.
  76. """
  77. poller = _ContinuousPolling(Clock())
  78. writer = object()
  79. poller.addWriter(writer)
  80. poller.removeWriter(writer)
  81. self.assertIsNone(poller._loop)
  82. self.assertEqual(poller._reactor.getDelayedCalls(), [])
  83. self.assertFalse(poller.isWriting(writer))
  84. def test_removeUnknown(self):
  85. """
  86. Removing unknown readers and writers silently does nothing.
  87. """
  88. poller = _ContinuousPolling(Clock())
  89. poller.removeWriter(object())
  90. poller.removeReader(object())
  91. def test_multipleReadersAndWriters(self):
  92. """
  93. Adding multiple readers and writers results in a single
  94. C{LoopingCall}.
  95. """
  96. poller = _ContinuousPolling(Clock())
  97. writer = object()
  98. poller.addWriter(writer)
  99. self.assertIsNotNone(poller._loop)
  100. poller.addWriter(object())
  101. self.assertIsNotNone(poller._loop)
  102. poller.addReader(object())
  103. self.assertIsNotNone(poller._loop)
  104. poller.addReader(object())
  105. poller.removeWriter(writer)
  106. self.assertIsNotNone(poller._loop)
  107. self.assertTrue(poller._loop.running)
  108. self.assertEqual(len(poller._reactor.getDelayedCalls()), 1)
  109. def test_readerPolling(self):
  110. """
  111. Adding a reader causes its C{doRead} to be called every 1
  112. milliseconds.
  113. """
  114. reactor = Clock()
  115. poller = _ContinuousPolling(reactor)
  116. desc = Descriptor()
  117. poller.addReader(desc)
  118. self.assertEqual(desc.events, [])
  119. reactor.advance(0.00001)
  120. self.assertEqual(desc.events, ["read"])
  121. reactor.advance(0.00001)
  122. self.assertEqual(desc.events, ["read", "read"])
  123. reactor.advance(0.00001)
  124. self.assertEqual(desc.events, ["read", "read", "read"])
  125. def test_writerPolling(self):
  126. """
  127. Adding a writer causes its C{doWrite} to be called every 1
  128. milliseconds.
  129. """
  130. reactor = Clock()
  131. poller = _ContinuousPolling(reactor)
  132. desc = Descriptor()
  133. poller.addWriter(desc)
  134. self.assertEqual(desc.events, [])
  135. reactor.advance(0.001)
  136. self.assertEqual(desc.events, ["write"])
  137. reactor.advance(0.001)
  138. self.assertEqual(desc.events, ["write", "write"])
  139. reactor.advance(0.001)
  140. self.assertEqual(desc.events, ["write", "write", "write"])
  141. def test_connectionLostOnRead(self):
  142. """
  143. If a C{doRead} returns a value indicating disconnection,
  144. C{connectionLost} is called on it.
  145. """
  146. reactor = Clock()
  147. poller = _ContinuousPolling(reactor)
  148. desc = Descriptor()
  149. desc.doRead = lambda: ConnectionDone()
  150. poller.addReader(desc)
  151. self.assertEqual(desc.events, [])
  152. reactor.advance(0.001)
  153. self.assertEqual(desc.events, ["lost"])
  154. def test_connectionLostOnWrite(self):
  155. """
  156. If a C{doWrite} returns a value indicating disconnection,
  157. C{connectionLost} is called on it.
  158. """
  159. reactor = Clock()
  160. poller = _ContinuousPolling(reactor)
  161. desc = Descriptor()
  162. desc.doWrite = lambda: ConnectionDone()
  163. poller.addWriter(desc)
  164. self.assertEqual(desc.events, [])
  165. reactor.advance(0.001)
  166. self.assertEqual(desc.events, ["lost"])
  167. def test_removeAll(self):
  168. """
  169. L{_ContinuousPolling.removeAll} removes all descriptors and returns
  170. the readers and writers.
  171. """
  172. poller = _ContinuousPolling(Clock())
  173. reader = object()
  174. writer = object()
  175. both = object()
  176. poller.addReader(reader)
  177. poller.addReader(both)
  178. poller.addWriter(writer)
  179. poller.addWriter(both)
  180. removed = poller.removeAll()
  181. self.assertEqual(poller.getReaders(), [])
  182. self.assertEqual(poller.getWriters(), [])
  183. self.assertEqual(len(removed), 3)
  184. self.assertEqual(set(removed), set([reader, writer, both]))
  185. def test_getReaders(self):
  186. """
  187. L{_ContinuousPolling.getReaders} returns a list of the read
  188. descriptors.
  189. """
  190. poller = _ContinuousPolling(Clock())
  191. reader = object()
  192. poller.addReader(reader)
  193. self.assertIn(reader, poller.getReaders())
  194. def test_getWriters(self):
  195. """
  196. L{_ContinuousPolling.getWriters} returns a list of the write
  197. descriptors.
  198. """
  199. poller = _ContinuousPolling(Clock())
  200. writer = object()
  201. poller.addWriter(writer)
  202. self.assertIn(writer, poller.getWriters())
  203. if _ContinuousPolling is None:
  204. skip = "epoll not supported in this environment."