test_threads.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for implementations of L{IReactorThreads}.
  5. """
  6. from __future__ import division, absolute_import
  7. __metaclass__ = type
  8. from weakref import ref
  9. import gc, threading
  10. from twisted.python.threadable import isInIOThread
  11. from twisted.internet.test.reactormixins import ReactorBuilder
  12. from twisted.python.threadpool import ThreadPool
  13. from twisted.internet.interfaces import IReactorThreads
  14. class ThreadTestsBuilder(ReactorBuilder):
  15. """
  16. Builder for defining tests relating to L{IReactorThreads}.
  17. """
  18. requiredInterfaces = (IReactorThreads,)
  19. def test_getThreadPool(self):
  20. """
  21. C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
  22. starts when C{reactor.run()} is called and stops before it returns.
  23. """
  24. state = []
  25. reactor = self.buildReactor()
  26. pool = reactor.getThreadPool()
  27. self.assertIsInstance(pool, ThreadPool)
  28. self.assertFalse(
  29. pool.started, "Pool should not start before reactor.run")
  30. def f():
  31. # Record the state for later assertions
  32. state.append(pool.started)
  33. state.append(pool.joined)
  34. reactor.stop()
  35. reactor.callWhenRunning(f)
  36. self.runReactor(reactor, 2)
  37. self.assertTrue(
  38. state[0], "Pool should start after reactor.run")
  39. self.assertFalse(
  40. state[1], "Pool should not be joined before reactor.stop")
  41. self.assertTrue(
  42. pool.joined,
  43. "Pool should be stopped after reactor.run returns")
  44. def test_suggestThreadPoolSize(self):
  45. """
  46. C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
  47. threadpool.
  48. """
  49. reactor = self.buildReactor()
  50. reactor.suggestThreadPoolSize(17)
  51. pool = reactor.getThreadPool()
  52. self.assertEqual(pool.max, 17)
  53. def test_delayedCallFromThread(self):
  54. """
  55. A function scheduled with L{IReactorThreads.callFromThread} invoked
  56. from a delayed call is run immediately in the next reactor iteration.
  57. When invoked from the reactor thread, previous implementations of
  58. L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
  59. up step, assuming the reactor would wake up on its own. However, this
  60. resulted in the reactor not noticing an insert into the thread queue at
  61. the right time (in this case, after the thread queue has been processed
  62. for that reactor iteration).
  63. """
  64. reactor = self.buildReactor()
  65. def threadCall():
  66. reactor.stop()
  67. # Set up the use of callFromThread being tested.
  68. reactor.callLater(0, reactor.callFromThread, threadCall)
  69. before = reactor.seconds()
  70. self.runReactor(reactor, 60)
  71. after = reactor.seconds()
  72. # We specified a timeout of 60 seconds. The timeout code in runReactor
  73. # probably won't actually work, though. If the reactor comes out of
  74. # the event notification API just a little bit early, say after 59.9999
  75. # seconds instead of after 60 seconds, then the queued thread call will
  76. # get processed but the timeout delayed call runReactor sets up won't!
  77. # Then the reactor will stop and runReactor will return without the
  78. # timeout firing. As it turns out, select() and poll() are quite
  79. # likely to return *slightly* earlier than we ask them to, so the
  80. # timeout will rarely happen, even if callFromThread is broken. So,
  81. # instead we'll measure the elapsed time and make sure it's something
  82. # less than about half of the timeout we specified. This is heuristic.
  83. # It assumes that select() won't ever return after 30 seconds when we
  84. # asked it to timeout after 60 seconds. And of course like all
  85. # time-based tests, it's slightly non-deterministic. If the OS doesn't
  86. # schedule this process for 30 seconds, then the test might fail even
  87. # if callFromThread is working.
  88. self.assertTrue(after - before < 30)
  89. def test_callFromThread(self):
  90. """
  91. A function scheduled with L{IReactorThreads.callFromThread} invoked
  92. from another thread is run in the reactor thread.
  93. """
  94. reactor = self.buildReactor()
  95. result = []
  96. def threadCall():
  97. result.append(threading.currentThread())
  98. reactor.stop()
  99. reactor.callLater(0, reactor.callInThread,
  100. reactor.callFromThread, threadCall)
  101. self.runReactor(reactor, 5)
  102. self.assertEqual(result, [threading.currentThread()])
  103. def test_stopThreadPool(self):
  104. """
  105. When the reactor stops, L{ReactorBase._stopThreadPool} drops the
  106. reactor's direct reference to its internal threadpool and removes
  107. the associated startup and shutdown triggers.
  108. This is the case of the thread pool being created before the reactor
  109. is run.
  110. """
  111. reactor = self.buildReactor()
  112. threadpool = ref(reactor.getThreadPool())
  113. reactor.callWhenRunning(reactor.stop)
  114. self.runReactor(reactor)
  115. gc.collect()
  116. self.assertIsNone(threadpool())
  117. def test_stopThreadPoolWhenStartedAfterReactorRan(self):
  118. """
  119. We must handle the case of shutting down the thread pool when it was
  120. started after the reactor was run in a special way.
  121. Some implementation background: The thread pool is started with
  122. callWhenRunning, which only returns a system trigger ID when it is
  123. invoked before the reactor is started.
  124. This is the case of the thread pool being created after the reactor
  125. is started.
  126. """
  127. reactor = self.buildReactor()
  128. threadPoolRefs = []
  129. def acquireThreadPool():
  130. threadPoolRefs.append(ref(reactor.getThreadPool()))
  131. reactor.stop()
  132. reactor.callWhenRunning(acquireThreadPool)
  133. self.runReactor(reactor)
  134. gc.collect()
  135. self.assertIsNone(threadPoolRefs[0]())
  136. def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
  137. """
  138. When the reactor has its shutdown event fired before it is run, the
  139. thread pool is completely destroyed.
  140. For what it's worth, the reason we support this behavior at all is
  141. because Trial does this.
  142. This is the case of the thread pool being created without the reactor
  143. being started at al.
  144. """
  145. reactor = self.buildReactor()
  146. threadPoolRef = ref(reactor.getThreadPool())
  147. reactor.fireSystemEvent("shutdown")
  148. if reactor.__class__.__name__ == "AsyncioSelectorReactor":
  149. self.assertIsNone(reactor.threadpool)
  150. else:
  151. gc.collect()
  152. self.assertIsNone(threadPoolRef())
  153. def test_isInIOThread(self):
  154. """
  155. The reactor registers itself as the I/O thread when it runs so that
  156. L{twisted.python.threadable.isInIOThread} returns C{True} if it is
  157. called in the thread the reactor is running in.
  158. """
  159. results = []
  160. reactor = self.buildReactor()
  161. def check():
  162. results.append(isInIOThread())
  163. reactor.stop()
  164. reactor.callWhenRunning(check)
  165. self.runReactor(reactor)
  166. self.assertEqual([True], results)
  167. def test_isNotInIOThread(self):
  168. """
  169. The reactor registers itself as the I/O thread when it runs so that
  170. L{twisted.python.threadable.isInIOThread} returns C{False} if it is
  171. called in a different thread than the reactor is running in.
  172. """
  173. results = []
  174. reactor = self.buildReactor()
  175. def check():
  176. results.append(isInIOThread())
  177. reactor.callFromThread(reactor.stop)
  178. reactor.callInThread(check)
  179. self.runReactor(reactor)
  180. self.assertEqual([False], results)
  181. globals().update(ThreadTestsBuilder.makeTestCaseClasses())