test_team.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted._threads._team}.
  5. """
  6. from __future__ import absolute_import, division, print_function
  7. from twisted.trial.unittest import SynchronousTestCase
  8. from twisted.python.context import call, get
  9. from twisted.python.components import proxyForInterface
  10. from twisted.python.failure import Failure
  11. from .. import IWorker, Team, createMemoryWorker, AlreadyQuit
  12. class ContextualWorker(proxyForInterface(IWorker, "_realWorker")):
  13. """
  14. A worker implementation that supplies a context.
  15. """
  16. def __init__(self, realWorker, **ctx):
  17. """
  18. Create with a real worker and a context.
  19. """
  20. self._realWorker = realWorker
  21. self._context = ctx
  22. def do(self, work):
  23. """
  24. Perform the given work with the context given to __init__.
  25. @param work: the work to pass on to the real worker.
  26. """
  27. super(ContextualWorker, self).do(lambda: call(self._context, work))
  28. class TeamTests(SynchronousTestCase):
  29. """
  30. Tests for L{Team}
  31. """
  32. def setUp(self):
  33. """
  34. Set up a L{Team} with inspectable, synchronous workers that can be
  35. single-stepped.
  36. """
  37. coordinator, self.coordinateOnce = createMemoryWorker()
  38. self.coordinator = ContextualWorker(coordinator, worker="coordinator")
  39. self.workerPerformers = []
  40. self.allWorkersEver = []
  41. self.allUnquitWorkers = []
  42. self.activePerformers = []
  43. self.noMoreWorkers = lambda: False
  44. def createWorker():
  45. if self.noMoreWorkers():
  46. return None
  47. worker, performer = createMemoryWorker()
  48. self.workerPerformers.append(performer)
  49. self.activePerformers.append(performer)
  50. cw = ContextualWorker(worker, worker=len(self.workerPerformers))
  51. self.allWorkersEver.append(cw)
  52. self.allUnquitWorkers.append(cw)
  53. realQuit = cw.quit
  54. def quitAndRemove():
  55. realQuit()
  56. self.allUnquitWorkers.remove(cw)
  57. self.activePerformers.remove(performer)
  58. cw.quit = quitAndRemove
  59. return cw
  60. self.failures = []
  61. def logException():
  62. self.failures.append(Failure())
  63. self.team = Team(coordinator, createWorker, logException)
  64. def coordinate(self):
  65. """
  66. Perform all work currently scheduled in the coordinator.
  67. @return: whether any coordination work was performed; if the
  68. coordinator was idle when this was called, return L{False}
  69. (otherwise L{True}).
  70. @rtype: L{bool}
  71. """
  72. did = False
  73. while self.coordinateOnce():
  74. did = True
  75. return did
  76. def performAllOutstandingWork(self):
  77. """
  78. Perform all work on the coordinator and worker performers that needs to
  79. be done.
  80. """
  81. continuing = True
  82. while continuing:
  83. continuing = self.coordinate()
  84. for performer in self.workerPerformers:
  85. if performer in self.activePerformers:
  86. performer()
  87. continuing = continuing or self.coordinate()
  88. def test_doDoesWorkInWorker(self):
  89. """
  90. L{Team.do} does the work in a worker created by the createWorker
  91. callable.
  92. """
  93. def something():
  94. something.who = get("worker")
  95. self.team.do(something)
  96. self.coordinate()
  97. self.assertEqual(self.team.statistics().busyWorkerCount, 1)
  98. self.performAllOutstandingWork()
  99. self.assertEqual(something.who, 1)
  100. self.assertEqual(self.team.statistics().busyWorkerCount, 0)
  101. def test_initialStatistics(self):
  102. """
  103. L{Team.statistics} returns an object with idleWorkerCount,
  104. busyWorkerCount, and backloggedWorkCount integer attributes.
  105. """
  106. stats = self.team.statistics()
  107. self.assertEqual(stats.idleWorkerCount, 0)
  108. self.assertEqual(stats.busyWorkerCount, 0)
  109. self.assertEqual(stats.backloggedWorkCount, 0)
  110. def test_growCreatesIdleWorkers(self):
  111. """
  112. L{Team.grow} increases the number of available idle workers.
  113. """
  114. self.team.grow(5)
  115. self.performAllOutstandingWork()
  116. self.assertEqual(len(self.workerPerformers), 5)
  117. def test_growCreateLimit(self):
  118. """
  119. L{Team.grow} increases the number of available idle workers until the
  120. C{createWorker} callable starts returning None.
  121. """
  122. self.noMoreWorkers = lambda: len(self.allWorkersEver) >= 3
  123. self.team.grow(5)
  124. self.performAllOutstandingWork()
  125. self.assertEqual(len(self.allWorkersEver), 3)
  126. self.assertEqual(self.team.statistics().idleWorkerCount, 3)
  127. def test_shrinkQuitsWorkers(self):
  128. """
  129. L{Team.shrink} will quit the given number of workers.
  130. """
  131. self.team.grow(5)
  132. self.performAllOutstandingWork()
  133. self.team.shrink(3)
  134. self.performAllOutstandingWork()
  135. self.assertEqual(len(self.allUnquitWorkers), 2)
  136. def test_shrinkToZero(self):
  137. """
  138. L{Team.shrink} with no arguments will stop all outstanding workers.
  139. """
  140. self.team.grow(10)
  141. self.performAllOutstandingWork()
  142. self.assertEqual(len(self.allUnquitWorkers), 10)
  143. self.team.shrink()
  144. self.assertEqual(len(self.allUnquitWorkers), 10)
  145. self.performAllOutstandingWork()
  146. self.assertEqual(len(self.allUnquitWorkers), 0)
  147. def test_moreWorkWhenNoWorkersAvailable(self):
  148. """
  149. When no additional workers are available, the given work is backlogged,
  150. and then performed later when the work was.
  151. """
  152. self.team.grow(3)
  153. self.coordinate()
  154. def something():
  155. something.times += 1
  156. something.times = 0
  157. self.assertEqual(self.team.statistics().idleWorkerCount, 3)
  158. for i in range(3):
  159. self.team.do(something)
  160. # Make progress on the coordinator but do _not_ actually complete the
  161. # work, yet.
  162. self.coordinate()
  163. self.assertEqual(self.team.statistics().idleWorkerCount, 0)
  164. self.noMoreWorkers = lambda: True
  165. self.team.do(something)
  166. self.coordinate()
  167. self.assertEqual(self.team.statistics().idleWorkerCount, 0)
  168. self.assertEqual(self.team.statistics().backloggedWorkCount, 1)
  169. self.performAllOutstandingWork()
  170. self.assertEqual(self.team.statistics().backloggedWorkCount, 0)
  171. self.assertEqual(something.times, 4)
  172. def test_exceptionInTask(self):
  173. """
  174. When an exception is raised in a task passed to L{Team.do}, the
  175. C{logException} given to the L{Team} at construction is invoked in the
  176. exception context.
  177. """
  178. self.team.do(lambda: 1/0)
  179. self.performAllOutstandingWork()
  180. self.assertEqual(len(self.failures), 1)
  181. self.assertEqual(self.failures[0].type, ZeroDivisionError)
  182. def test_quit(self):
  183. """
  184. L{Team.quit} causes future invocations of L{Team.do} and L{Team.quit}
  185. to raise L{AlreadyQuit}.
  186. """
  187. self.team.quit()
  188. self.assertRaises(AlreadyQuit, self.team.quit)
  189. self.assertRaises(AlreadyQuit, self.team.do, list)
  190. def test_quitQuits(self):
  191. """
  192. L{Team.quit} causes all idle workers, as well as the coordinator
  193. worker, to quit.
  194. """
  195. for x in range(10):
  196. self.team.do(list)
  197. self.performAllOutstandingWork()
  198. self.team.quit()
  199. self.performAllOutstandingWork()
  200. self.assertEqual(len(self.allUnquitWorkers), 0)
  201. self.assertRaises(AlreadyQuit, self.coordinator.quit)
  202. def test_quitQuitsLaterWhenBusy(self):
  203. """
  204. L{Team.quit} causes all busy workers to be quit once they've finished
  205. the work they've been given.
  206. """
  207. self.team.grow(10)
  208. for x in range(5):
  209. self.team.do(list)
  210. self.coordinate()
  211. self.team.quit()
  212. self.coordinate()
  213. self.assertEqual(len(self.allUnquitWorkers), 5)
  214. self.performAllOutstandingWork()
  215. self.assertEqual(len(self.allUnquitWorkers), 0)
  216. self.assertRaises(AlreadyQuit, self.coordinator.quit)
  217. def test_quitConcurrentWithWorkHappening(self):
  218. """
  219. If work happens after L{Team.quit} sets its C{Quit} flag, but before
  220. any other work takes place, the L{Team} should still exit gracefully.
  221. """
  222. self.team.do(list)
  223. originalSet = self.team._quit.set
  224. def performWorkConcurrently():
  225. originalSet()
  226. self.performAllOutstandingWork()
  227. self.team._quit.set = performWorkConcurrently
  228. self.team.quit()
  229. self.assertRaises(AlreadyQuit, self.team.quit)
  230. self.assertRaises(AlreadyQuit, self.team.do, list)
  231. def test_shrinkWhenBusy(self):
  232. """
  233. L{Team.shrink} will wait for busy workers to finish being busy and then
  234. quit them.
  235. """
  236. for x in range(10):
  237. self.team.do(list)
  238. self.coordinate()
  239. self.assertEqual(len(self.allUnquitWorkers), 10)
  240. # There should be 10 busy workers at this point.
  241. self.team.shrink(7)
  242. self.performAllOutstandingWork()
  243. self.assertEqual(len(self.allUnquitWorkers), 3)