test_threadpool.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.python.threadpool}
  5. """
  6. from __future__ import division, absolute_import
  7. import pickle, time, weakref, gc, threading
  8. from twisted.python.compat import range
  9. from twisted.trial import unittest
  10. from twisted.python import threadpool, threadable, failure, context
  11. from twisted._threads import Team, createMemoryWorker
  12. #
  13. # See the end of this module for the remainder of the imports.
  14. #
  15. class Synchronization(object):
  16. failures = 0
  17. def __init__(self, N, waiting):
  18. self.N = N
  19. self.waiting = waiting
  20. self.lock = threading.Lock()
  21. self.runs = []
  22. def run(self):
  23. # This is the testy part: this is supposed to be invoked
  24. # serially from multiple threads. If that is actually the
  25. # case, we will never fail to acquire this lock. If it is
  26. # *not* the case, we might get here while someone else is
  27. # holding the lock.
  28. if self.lock.acquire(False):
  29. if not len(self.runs) % 5:
  30. time.sleep(0.0002) # Constant selected based on
  31. # empirical data to maximize the
  32. # chance of a quick failure if this
  33. # code is broken.
  34. self.lock.release()
  35. else:
  36. self.failures += 1
  37. # This is just the only way I can think of to wake up the test
  38. # method. It doesn't actually have anything to do with the
  39. # test.
  40. self.lock.acquire()
  41. self.runs.append(None)
  42. if len(self.runs) == self.N:
  43. self.waiting.release()
  44. self.lock.release()
  45. synchronized = ["run"]
  46. threadable.synchronize(Synchronization)
  47. class ThreadPoolTests(unittest.SynchronousTestCase):
  48. """
  49. Test threadpools.
  50. """
  51. def getTimeout(self):
  52. """
  53. Return number of seconds to wait before giving up.
  54. """
  55. return 5 # Really should be order of magnitude less
  56. def _waitForLock(self, lock):
  57. items = range(1000000)
  58. for i in items:
  59. if lock.acquire(False):
  60. break
  61. time.sleep(1e-5)
  62. else:
  63. self.fail("A long time passed without succeeding")
  64. def test_attributes(self):
  65. """
  66. L{ThreadPool.min} and L{ThreadPool.max} are set to the values passed to
  67. L{ThreadPool.__init__}.
  68. """
  69. pool = threadpool.ThreadPool(12, 22)
  70. self.assertEqual(pool.min, 12)
  71. self.assertEqual(pool.max, 22)
  72. def test_start(self):
  73. """
  74. L{ThreadPool.start} creates the minimum number of threads specified.
  75. """
  76. pool = threadpool.ThreadPool(0, 5)
  77. pool.start()
  78. self.addCleanup(pool.stop)
  79. self.assertEqual(len(pool.threads), 0)
  80. pool = threadpool.ThreadPool(3, 10)
  81. self.assertEqual(len(pool.threads), 0)
  82. pool.start()
  83. self.addCleanup(pool.stop)
  84. self.assertEqual(len(pool.threads), 3)
  85. def test_adjustingWhenPoolStopped(self):
  86. """
  87. L{ThreadPool.adjustPoolsize} only modifies the pool size and does not
  88. start new workers while the pool is not running.
  89. """
  90. pool = threadpool.ThreadPool(0, 5)
  91. pool.start()
  92. pool.stop()
  93. pool.adjustPoolsize(2)
  94. self.assertEqual(len(pool.threads), 0)
  95. def test_threadCreationArguments(self):
  96. """
  97. Test that creating threads in the threadpool with application-level
  98. objects as arguments doesn't results in those objects never being
  99. freed, with the thread maintaining a reference to them as long as it
  100. exists.
  101. """
  102. tp = threadpool.ThreadPool(0, 1)
  103. tp.start()
  104. self.addCleanup(tp.stop)
  105. # Sanity check - no threads should have been started yet.
  106. self.assertEqual(tp.threads, [])
  107. # Here's our function
  108. def worker(arg):
  109. pass
  110. # weakref needs an object subclass
  111. class Dumb(object):
  112. pass
  113. # And here's the unique object
  114. unique = Dumb()
  115. workerRef = weakref.ref(worker)
  116. uniqueRef = weakref.ref(unique)
  117. # Put some work in
  118. tp.callInThread(worker, unique)
  119. # Add an event to wait completion
  120. event = threading.Event()
  121. tp.callInThread(event.set)
  122. event.wait(self.getTimeout())
  123. del worker
  124. del unique
  125. gc.collect()
  126. self.assertIsNone(uniqueRef())
  127. self.assertIsNone(workerRef())
  128. def test_threadCreationArgumentsCallInThreadWithCallback(self):
  129. """
  130. As C{test_threadCreationArguments} above, but for
  131. callInThreadWithCallback.
  132. """
  133. tp = threadpool.ThreadPool(0, 1)
  134. tp.start()
  135. self.addCleanup(tp.stop)
  136. # Sanity check - no threads should have been started yet.
  137. self.assertEqual(tp.threads, [])
  138. # this holds references obtained in onResult
  139. refdict = {} # name -> ref value
  140. onResultWait = threading.Event()
  141. onResultDone = threading.Event()
  142. resultRef = []
  143. # result callback
  144. def onResult(success, result):
  145. onResultWait.wait(self.getTimeout())
  146. refdict['workerRef'] = workerRef()
  147. refdict['uniqueRef'] = uniqueRef()
  148. onResultDone.set()
  149. resultRef.append(weakref.ref(result))
  150. # Here's our function
  151. def worker(arg, test):
  152. return Dumb()
  153. # weakref needs an object subclass
  154. class Dumb(object):
  155. pass
  156. # And here's the unique object
  157. unique = Dumb()
  158. onResultRef = weakref.ref(onResult)
  159. workerRef = weakref.ref(worker)
  160. uniqueRef = weakref.ref(unique)
  161. # Put some work in
  162. tp.callInThreadWithCallback(onResult, worker, unique, test=unique)
  163. del worker
  164. del unique
  165. # let onResult collect the refs
  166. onResultWait.set()
  167. # wait for onResult
  168. onResultDone.wait(self.getTimeout())
  169. gc.collect()
  170. self.assertIsNone(uniqueRef())
  171. self.assertIsNone(workerRef())
  172. # XXX There's a race right here - has onResult in the worker thread
  173. # returned and the locals in _worker holding it and the result been
  174. # deleted yet?
  175. del onResult
  176. gc.collect()
  177. self.assertIsNone(onResultRef())
  178. self.assertIsNone(resultRef[0]())
  179. def test_persistence(self):
  180. """
  181. Threadpools can be pickled and unpickled, which should preserve the
  182. number of threads and other parameters.
  183. """
  184. pool = threadpool.ThreadPool(7, 20)
  185. self.assertEqual(pool.min, 7)
  186. self.assertEqual(pool.max, 20)
  187. # check that unpickled threadpool has same number of threads
  188. copy = pickle.loads(pickle.dumps(pool))
  189. self.assertEqual(copy.min, 7)
  190. self.assertEqual(copy.max, 20)
  191. def _threadpoolTest(self, method):
  192. """
  193. Test synchronization of calls made with C{method}, which should be
  194. one of the mechanisms of the threadpool to execute work in threads.
  195. """
  196. # This is a schizophrenic test: it seems to be trying to test
  197. # both the callInThread()/dispatch() behavior of the ThreadPool as well
  198. # as the serialization behavior of threadable.synchronize(). It
  199. # would probably make more sense as two much simpler tests.
  200. N = 10
  201. tp = threadpool.ThreadPool()
  202. tp.start()
  203. self.addCleanup(tp.stop)
  204. waiting = threading.Lock()
  205. waiting.acquire()
  206. actor = Synchronization(N, waiting)
  207. for i in range(N):
  208. method(tp, actor)
  209. self._waitForLock(waiting)
  210. self.assertFalse(actor.failures, "run() re-entered %d times" %
  211. (actor.failures,))
  212. def test_callInThread(self):
  213. """
  214. Call C{_threadpoolTest} with C{callInThread}.
  215. """
  216. return self._threadpoolTest(
  217. lambda tp, actor: tp.callInThread(actor.run))
  218. def test_callInThreadException(self):
  219. """
  220. L{ThreadPool.callInThread} logs exceptions raised by the callable it
  221. is passed.
  222. """
  223. class NewError(Exception):
  224. pass
  225. def raiseError():
  226. raise NewError()
  227. tp = threadpool.ThreadPool(0, 1)
  228. tp.callInThread(raiseError)
  229. tp.start()
  230. tp.stop()
  231. errors = self.flushLoggedErrors(NewError)
  232. self.assertEqual(len(errors), 1)
  233. def test_callInThreadWithCallback(self):
  234. """
  235. L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
  236. two-tuple of C{(True, result)} where C{result} is the value returned
  237. by the callable supplied.
  238. """
  239. waiter = threading.Lock()
  240. waiter.acquire()
  241. results = []
  242. def onResult(success, result):
  243. waiter.release()
  244. results.append(success)
  245. results.append(result)
  246. tp = threadpool.ThreadPool(0, 1)
  247. tp.callInThreadWithCallback(onResult, lambda: "test")
  248. tp.start()
  249. try:
  250. self._waitForLock(waiter)
  251. finally:
  252. tp.stop()
  253. self.assertTrue(results[0])
  254. self.assertEqual(results[1], "test")
  255. def test_callInThreadWithCallbackExceptionInCallback(self):
  256. """
  257. L{ThreadPool.callInThreadWithCallback} calls C{onResult} with a
  258. two-tuple of C{(False, failure)} where C{failure} represents the
  259. exception raised by the callable supplied.
  260. """
  261. class NewError(Exception):
  262. pass
  263. def raiseError():
  264. raise NewError()
  265. waiter = threading.Lock()
  266. waiter.acquire()
  267. results = []
  268. def onResult(success, result):
  269. waiter.release()
  270. results.append(success)
  271. results.append(result)
  272. tp = threadpool.ThreadPool(0, 1)
  273. tp.callInThreadWithCallback(onResult, raiseError)
  274. tp.start()
  275. try:
  276. self._waitForLock(waiter)
  277. finally:
  278. tp.stop()
  279. self.assertFalse(results[0])
  280. self.assertIsInstance(results[1], failure.Failure)
  281. self.assertTrue(issubclass(results[1].type, NewError))
  282. def test_callInThreadWithCallbackExceptionInOnResult(self):
  283. """
  284. L{ThreadPool.callInThreadWithCallback} logs the exception raised by
  285. C{onResult}.
  286. """
  287. class NewError(Exception):
  288. pass
  289. waiter = threading.Lock()
  290. waiter.acquire()
  291. results = []
  292. def onResult(success, result):
  293. results.append(success)
  294. results.append(result)
  295. raise NewError()
  296. tp = threadpool.ThreadPool(0, 1)
  297. tp.callInThreadWithCallback(onResult, lambda : None)
  298. tp.callInThread(waiter.release)
  299. tp.start()
  300. try:
  301. self._waitForLock(waiter)
  302. finally:
  303. tp.stop()
  304. errors = self.flushLoggedErrors(NewError)
  305. self.assertEqual(len(errors), 1)
  306. self.assertTrue(results[0])
  307. self.assertIsNone(results[1])
  308. def test_callbackThread(self):
  309. """
  310. L{ThreadPool.callInThreadWithCallback} calls the function it is
  311. given and the C{onResult} callback in the same thread.
  312. """
  313. threadIds = []
  314. event = threading.Event()
  315. def onResult(success, result):
  316. threadIds.append(threading.currentThread().ident)
  317. event.set()
  318. def func():
  319. threadIds.append(threading.currentThread().ident)
  320. tp = threadpool.ThreadPool(0, 1)
  321. tp.callInThreadWithCallback(onResult, func)
  322. tp.start()
  323. self.addCleanup(tp.stop)
  324. event.wait(self.getTimeout())
  325. self.assertEqual(len(threadIds), 2)
  326. self.assertEqual(threadIds[0], threadIds[1])
  327. def test_callbackContext(self):
  328. """
  329. The context L{ThreadPool.callInThreadWithCallback} is invoked in is
  330. shared by the context the callable and C{onResult} callback are
  331. invoked in.
  332. """
  333. myctx = context.theContextTracker.currentContext().contexts[-1]
  334. myctx['testing'] = 'this must be present'
  335. contexts = []
  336. event = threading.Event()
  337. def onResult(success, result):
  338. ctx = context.theContextTracker.currentContext().contexts[-1]
  339. contexts.append(ctx)
  340. event.set()
  341. def func():
  342. ctx = context.theContextTracker.currentContext().contexts[-1]
  343. contexts.append(ctx)
  344. tp = threadpool.ThreadPool(0, 1)
  345. tp.callInThreadWithCallback(onResult, func)
  346. tp.start()
  347. self.addCleanup(tp.stop)
  348. event.wait(self.getTimeout())
  349. self.assertEqual(len(contexts), 2)
  350. self.assertEqual(myctx, contexts[0])
  351. self.assertEqual(myctx, contexts[1])
  352. def test_existingWork(self):
  353. """
  354. Work added to the threadpool before its start should be executed once
  355. the threadpool is started: this is ensured by trying to release a lock
  356. previously acquired.
  357. """
  358. waiter = threading.Lock()
  359. waiter.acquire()
  360. tp = threadpool.ThreadPool(0, 1)
  361. tp.callInThread(waiter.release) # before start()
  362. tp.start()
  363. try:
  364. self._waitForLock(waiter)
  365. finally:
  366. tp.stop()
  367. def test_workerStateTransition(self):
  368. """
  369. As the worker receives and completes work, it transitions between
  370. the working and waiting states.
  371. """
  372. pool = threadpool.ThreadPool(0, 1)
  373. pool.start()
  374. self.addCleanup(pool.stop)
  375. # sanity check
  376. self.assertEqual(pool.workers, 0)
  377. self.assertEqual(len(pool.waiters), 0)
  378. self.assertEqual(len(pool.working), 0)
  379. # fire up a worker and give it some 'work'
  380. threadWorking = threading.Event()
  381. threadFinish = threading.Event()
  382. def _thread():
  383. threadWorking.set()
  384. threadFinish.wait(10)
  385. pool.callInThread(_thread)
  386. threadWorking.wait(10)
  387. self.assertEqual(pool.workers, 1)
  388. self.assertEqual(len(pool.waiters), 0)
  389. self.assertEqual(len(pool.working), 1)
  390. # finish work, and spin until state changes
  391. threadFinish.set()
  392. while not len(pool.waiters):
  393. time.sleep(0.0005)
  394. # make sure state changed correctly
  395. self.assertEqual(len(pool.waiters), 1)
  396. self.assertEqual(len(pool.working), 0)
  397. class RaceConditionTests(unittest.SynchronousTestCase):
  398. def setUp(self):
  399. self.threadpool = threadpool.ThreadPool(0, 10)
  400. self.event = threading.Event()
  401. self.threadpool.start()
  402. def done():
  403. self.threadpool.stop()
  404. del self.threadpool
  405. self.addCleanup(done)
  406. def getTimeout(self):
  407. """
  408. A reasonable number of seconds to time out.
  409. """
  410. return 5
  411. def test_synchronization(self):
  412. """
  413. If multiple threads are waiting on an event (via blocking on something
  414. in a callable passed to L{threadpool.ThreadPool.callInThread}), and
  415. there is spare capacity in the threadpool, sending another callable
  416. which will cause those to un-block to
  417. L{threadpool.ThreadPool.callInThread} will reliably run that callable
  418. and un-block the blocked threads promptly.
  419. @note: This is not really a unit test, it is a stress-test. You may
  420. need to run it with C{trial -u} to fail reliably if there is a
  421. problem. It is very hard to regression-test for this particular
  422. bug - one where the thread pool may consider itself as having
  423. "enough capacity" when it really needs to spin up a new thread if
  424. it possibly can - in a deterministic way, since the bug can only be
  425. provoked by subtle race conditions.
  426. """
  427. timeout = self.getTimeout()
  428. self.threadpool.callInThread(self.event.set)
  429. self.event.wait(timeout)
  430. self.event.clear()
  431. for i in range(3):
  432. self.threadpool.callInThread(self.event.wait)
  433. self.threadpool.callInThread(self.event.set)
  434. self.event.wait(timeout)
  435. if not self.event.isSet():
  436. self.event.set()
  437. self.fail(
  438. "'set' did not run in thread; timed out waiting on 'wait'."
  439. )
  440. def test_singleThread(self):
  441. """
  442. The submission of a new job to a thread pool in response to the
  443. C{onResult} callback does not cause a new thread to be added to the
  444. thread pool.
  445. This requires that the thread which calls C{onResult} to have first
  446. marked itself as available so that when the new job is queued, that
  447. thread may be considered to run it. This is desirable so that when
  448. only N jobs are ever being executed in the thread pool at once only
  449. N threads will ever be created.
  450. """
  451. # Ensure no threads running
  452. self.assertEqual(self.threadpool.workers, 0)
  453. event = threading.Event()
  454. event.clear()
  455. def onResult(success, counter):
  456. event.set()
  457. for i in range(10):
  458. self.threadpool.callInThreadWithCallback(
  459. onResult, lambda: None)
  460. event.wait(10)
  461. event.clear()
  462. self.assertEqual(self.threadpool.workers, 1)
  463. class MemoryPool(threadpool.ThreadPool):
  464. """
  465. A deterministic threadpool that uses in-memory data structures to queue
  466. work rather than threads to execute work.
  467. """
  468. def __init__(self, coordinator, failTest, newWorker, *args, **kwargs):
  469. """
  470. Initialize this L{MemoryPool} with a test case.
  471. @param coordinator: a worker used to coordinate work in the L{Team}
  472. underlying this threadpool.
  473. @type coordinator: L{twisted._threads.IExclusiveWorker}
  474. @param failTest: A 1-argument callable taking an exception and raising
  475. a test-failure exception.
  476. @type failTest: 1-argument callable taking (L{Failure}) and raising
  477. L{unittest.FailTest}.
  478. @param newWorker: a 0-argument callable that produces a new
  479. L{twisted._threads.IWorker} provider on each invocation.
  480. @type newWorker: 0-argument callable returning
  481. L{twisted._threads.IWorker}.
  482. """
  483. self._coordinator = coordinator
  484. self._failTest = failTest
  485. self._newWorker = newWorker
  486. threadpool.ThreadPool.__init__(self, *args, **kwargs)
  487. def _pool(self, currentLimit, threadFactory):
  488. """
  489. Override testing hook to create a deterministic threadpool.
  490. @param currentLimit: A 1-argument callable which returns the current
  491. threadpool size limit.
  492. @param threadFactory: ignored in this invocation; a 0-argument callable
  493. that would produce a thread.
  494. @return: a L{Team} backed by the coordinator and worker passed to
  495. L{MemoryPool.__init__}.
  496. """
  497. def respectLimit():
  498. # The expression in this method copied and pasted from
  499. # twisted.threads._pool, which is unfortunately bound up
  500. # with lots of actual-threading stuff.
  501. stats = team.statistics()
  502. if (stats.busyWorkerCount + stats.idleWorkerCount
  503. >= currentLimit()):
  504. return None
  505. return self._newWorker()
  506. team = Team(coordinator=self._coordinator,
  507. createWorker=respectLimit,
  508. logException=self._failTest)
  509. return team
  510. class PoolHelper(object):
  511. """
  512. A L{PoolHelper} constructs a L{threadpool.ThreadPool} that doesn't actually
  513. use threads, by using the internal interfaces in L{twisted._threads}.
  514. @ivar performCoordination: a 0-argument callable that will perform one unit
  515. of "coordination" - work involved in delegating work to other threads -
  516. and return L{True} if it did any work, L{False} otherwise.
  517. @ivar workers: the workers which represent the threads within the pool -
  518. the workers other than the coordinator.
  519. @type workers: L{list} of 2-tuple of (L{IWorker}, C{workPerformer}) where
  520. C{workPerformer} is a 0-argument callable like C{performCoordination}.
  521. @ivar threadpool: a modified L{threadpool.ThreadPool} to test.
  522. @type threadpool: L{MemoryPool}
  523. """
  524. def __init__(self, testCase, *args, **kwargs):
  525. """
  526. Create a L{PoolHelper}.
  527. @param testCase: a test case attached to this helper.
  528. @type args: The arguments passed to a L{threadpool.ThreadPool}.
  529. @type kwargs: The arguments passed to a L{threadpool.ThreadPool}
  530. """
  531. coordinator, self.performCoordination = createMemoryWorker()
  532. self.workers = []
  533. def newWorker():
  534. self.workers.append(createMemoryWorker())
  535. return self.workers[-1][0]
  536. self.threadpool = MemoryPool(coordinator, testCase.fail, newWorker,
  537. *args, **kwargs)
  538. def performAllCoordination(self):
  539. """
  540. Perform all currently scheduled "coordination", which is the work
  541. involved in delegating work to other threads.
  542. """
  543. while self.performCoordination():
  544. pass
  545. class MemoryBackedTests(unittest.SynchronousTestCase):
  546. """
  547. Tests using L{PoolHelper} to deterministically test properties of the
  548. threadpool implementation.
  549. """
  550. def test_workBeforeStarting(self):
  551. """
  552. If a threadpool is told to do work before starting, then upon starting
  553. up, it will start enough workers to handle all of the enqueued work
  554. that it's been given.
  555. """
  556. helper = PoolHelper(self, 0, 10)
  557. n = 5
  558. for x in range(n):
  559. helper.threadpool.callInThread(lambda: None)
  560. helper.performAllCoordination()
  561. self.assertEqual(helper.workers, [])
  562. helper.threadpool.start()
  563. helper.performAllCoordination()
  564. self.assertEqual(len(helper.workers), n)
  565. def test_tooMuchWorkBeforeStarting(self):
  566. """
  567. If the amount of work before starting exceeds the maximum number of
  568. threads allowed to the threadpool, only the maximum count will be
  569. started.
  570. """
  571. helper = PoolHelper(self, 0, 10)
  572. n = 50
  573. for x in range(n):
  574. helper.threadpool.callInThread(lambda: None)
  575. helper.performAllCoordination()
  576. self.assertEqual(helper.workers, [])
  577. helper.threadpool.start()
  578. helper.performAllCoordination()
  579. self.assertEqual(len(helper.workers), helper.threadpool.max)