test_threadworker.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted._threads._threadworker}.
  5. """
  6. from __future__ import absolute_import, division, print_function
  7. import gc
  8. import weakref
  9. from twisted.trial.unittest import SynchronousTestCase
  10. from threading import ThreadError, local
  11. from .. import ThreadWorker, LockWorker, AlreadyQuit
  12. class FakeQueueEmpty(Exception):
  13. """
  14. L{FakeQueue}'s C{get} has exhausted the queue.
  15. """
  16. class WouldDeadlock(Exception):
  17. """
  18. If this were a real lock, you'd be deadlocked because the lock would be
  19. double-acquired.
  20. """
  21. class FakeThread(object):
  22. """
  23. A fake L{threading.Thread}.
  24. @ivar target: A target function to run.
  25. @type target: L{callable}
  26. @ivar started: Has this thread been started?
  27. @type started: L{bool}
  28. """
  29. def __init__(self, target):
  30. """
  31. Create a L{FakeThread} with a target.
  32. """
  33. self.target = target
  34. self.started = False
  35. def start(self):
  36. """
  37. Set the "started" flag.
  38. """
  39. self.started = True
  40. class FakeQueue(object):
  41. """
  42. A fake L{Queue} implementing C{put} and C{get}.
  43. @ivar items: A lit of items placed by C{put} but not yet retrieved by
  44. C{get}.
  45. @type items: L{list}
  46. """
  47. def __init__(self):
  48. """
  49. Create a L{FakeQueue}.
  50. """
  51. self.items = []
  52. def put(self, item):
  53. """
  54. Put an item into the queue for later retrieval by L{FakeQueue.get}.
  55. @param item: any object
  56. """
  57. self.items.append(item)
  58. def get(self):
  59. """
  60. Get an item.
  61. @return: an item previously put by C{put}.
  62. """
  63. if not self.items:
  64. raise FakeQueueEmpty()
  65. return self.items.pop(0)
  66. class FakeLock(object):
  67. """
  68. A stand-in for L{threading.Lock}.
  69. @ivar acquired: Whether this lock is presently acquired.
  70. """
  71. def __init__(self):
  72. """
  73. Create a lock in the un-acquired state.
  74. """
  75. self.acquired = False
  76. def acquire(self):
  77. """
  78. Acquire the lock. Raise an exception if the lock is already acquired.
  79. """
  80. if self.acquired:
  81. raise WouldDeadlock()
  82. self.acquired = True
  83. def release(self):
  84. """
  85. Release the lock. Raise an exception if the lock is not presently
  86. acquired.
  87. """
  88. if not self.acquired:
  89. raise ThreadError()
  90. self.acquired = False
  91. class ThreadWorkerTests(SynchronousTestCase):
  92. """
  93. Tests for L{ThreadWorker}.
  94. """
  95. def setUp(self):
  96. """
  97. Create a worker with fake threads.
  98. """
  99. self.fakeThreads = []
  100. self.fakeQueue = FakeQueue()
  101. def startThread(target):
  102. newThread = FakeThread(target=target)
  103. newThread.start()
  104. self.fakeThreads.append(newThread)
  105. return newThread
  106. self.worker = ThreadWorker(startThread, self.fakeQueue)
  107. def test_startsThreadAndPerformsWork(self):
  108. """
  109. L{ThreadWorker} calls its C{createThread} callable to create a thread,
  110. its C{createQueue} callable to create a queue, and then the thread's
  111. target pulls work from that queue.
  112. """
  113. self.assertEqual(len(self.fakeThreads), 1)
  114. self.assertEqual(self.fakeThreads[0].started, True)
  115. def doIt():
  116. doIt.done = True
  117. doIt.done = False
  118. self.worker.do(doIt)
  119. self.assertEqual(doIt.done, False)
  120. self.assertRaises(FakeQueueEmpty, self.fakeThreads[0].target)
  121. self.assertEqual(doIt.done, True)
  122. def test_quitPreventsFutureCalls(self):
  123. """
  124. L{ThreadWorker.quit} causes future calls to L{ThreadWorker.do} and
  125. L{ThreadWorker.quit} to raise L{AlreadyQuit}.
  126. """
  127. self.worker.quit()
  128. self.assertRaises(AlreadyQuit, self.worker.quit)
  129. self.assertRaises(AlreadyQuit, self.worker.do, list)
  130. class LockWorkerTests(SynchronousTestCase):
  131. """
  132. Tests for L{LockWorker}.
  133. """
  134. def test_fakeDeadlock(self):
  135. """
  136. The L{FakeLock} test fixture will alert us if there's a potential
  137. deadlock.
  138. """
  139. lock = FakeLock()
  140. lock.acquire()
  141. self.assertRaises(WouldDeadlock, lock.acquire)
  142. def test_fakeDoubleRelease(self):
  143. """
  144. The L{FakeLock} test fixture will alert us if there's a potential
  145. double-release.
  146. """
  147. lock = FakeLock()
  148. self.assertRaises(ThreadError, lock.release)
  149. lock.acquire()
  150. self.assertEqual(None, lock.release())
  151. self.assertRaises(ThreadError, lock.release)
  152. def test_doExecutesImmediatelyWithLock(self):
  153. """
  154. L{LockWorker.do} immediately performs the work it's given, while the
  155. lock is acquired.
  156. """
  157. storage = local()
  158. lock = FakeLock()
  159. worker = LockWorker(lock, storage)
  160. def work():
  161. work.done = True
  162. work.acquired = lock.acquired
  163. work.done = False
  164. worker.do(work)
  165. self.assertEqual(work.done, True)
  166. self.assertEqual(work.acquired, True)
  167. self.assertEqual(lock.acquired, False)
  168. def test_doUnwindsReentrancy(self):
  169. """
  170. If L{LockWorker.do} is called recursively, it postpones the inner call
  171. until the outer one is complete.
  172. """
  173. lock = FakeLock()
  174. worker = LockWorker(lock, local())
  175. levels = []
  176. acquired = []
  177. def work():
  178. work.level += 1
  179. levels.append(work.level)
  180. acquired.append(lock.acquired)
  181. if len(levels) < 2:
  182. worker.do(work)
  183. work.level -= 1
  184. work.level = 0
  185. worker.do(work)
  186. self.assertEqual(levels, [1, 1])
  187. self.assertEqual(acquired, [True, True])
  188. def test_quit(self):
  189. """
  190. L{LockWorker.quit} frees the resources associated with its lock and
  191. causes further calls to C{do} and C{quit} to fail.
  192. """
  193. lock = FakeLock()
  194. ref = weakref.ref(lock)
  195. worker = LockWorker(lock, local())
  196. lock = None
  197. self.assertIsNot(ref(), None)
  198. worker.quit()
  199. gc.collect()
  200. self.assertIs(ref(), None)
  201. self.assertRaises(AlreadyQuit, worker.quit)
  202. self.assertRaises(AlreadyQuit, worker.do, list)
  203. def test_quitWhileWorking(self):
  204. """
  205. If L{LockWorker.quit} is invoked during a call to L{LockWorker.do}, all
  206. recursive work scheduled with L{LockWorker.do} will be completed and
  207. the lock will be released.
  208. """
  209. lock = FakeLock()
  210. ref = weakref.ref(lock)
  211. worker = LockWorker(lock, local())
  212. def phase1():
  213. worker.do(phase2)
  214. worker.quit()
  215. self.assertRaises(AlreadyQuit, worker.do, list)
  216. phase1.complete = True
  217. phase1.complete = False
  218. def phase2():
  219. phase2.complete = True
  220. phase2.acquired = lock.acquired
  221. phase2.complete = False
  222. worker.do(phase1)
  223. self.assertEqual(phase1.complete, True)
  224. self.assertEqual(phase2.complete, True)
  225. self.assertEqual(lock.acquired, False)
  226. lock = None
  227. gc.collect()
  228. self.assertIs(ref(), None)
  229. def test_quitWhileGettingLock(self):
  230. """
  231. If L{LockWorker.do} is called concurrently with L{LockWorker.quit}, and
  232. C{quit} wins the race before C{do} gets the lock attribute, then
  233. L{AlreadyQuit} will be raised.
  234. """
  235. class RacyLockWorker(LockWorker):
  236. def _lock_get(self):
  237. self.quit()
  238. return self.__dict__['_lock']
  239. def _lock_set(self, value):
  240. self.__dict__['_lock'] = value
  241. _lock = property(_lock_get, _lock_set)
  242. worker = RacyLockWorker(FakeLock(), local())
  243. self.assertRaises(AlreadyQuit, worker.do, list)