threadpool.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. # -*- test-case-name: twisted.test.test_threadpool -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. twisted.python.threadpool: a pool of threads to which we dispatch tasks.
  6. In most cases you can just use C{reactor.callInThread} and friends
  7. instead of creating a thread pool directly.
  8. """
  9. from __future__ import division, absolute_import
  10. import threading
  11. from twisted._threads import pool as _pool
  12. from twisted.python import log, context
  13. from twisted.python.failure import Failure
  14. from twisted.python._oldstyle import _oldStyle
  15. WorkerStop = object()
  16. @_oldStyle
  17. class ThreadPool:
  18. """
  19. This class (hopefully) generalizes the functionality of a pool of threads
  20. to which work can be dispatched.
  21. L{callInThread} and L{stop} should only be called from a single thread.
  22. @ivar started: Whether or not the thread pool is currently running.
  23. @type started: L{bool}
  24. @ivar threads: List of workers currently running in this thread pool.
  25. @type threads: L{list}
  26. @ivar _pool: A hook for testing.
  27. @type _pool: callable compatible with L{_pool}
  28. """
  29. min = 5
  30. max = 20
  31. joined = False
  32. started = False
  33. workers = 0
  34. name = None
  35. threadFactory = threading.Thread
  36. currentThread = staticmethod(threading.currentThread)
  37. _pool = staticmethod(_pool)
  38. def __init__(self, minthreads=5, maxthreads=20, name=None):
  39. """
  40. Create a new threadpool.
  41. @param minthreads: minimum number of threads in the pool
  42. @type minthreads: L{int}
  43. @param maxthreads: maximum number of threads in the pool
  44. @type maxthreads: L{int}
  45. @param name: The name to give this threadpool; visible in log messages.
  46. @type name: native L{str}
  47. """
  48. assert minthreads >= 0, 'minimum is negative'
  49. assert minthreads <= maxthreads, 'minimum is greater than maximum'
  50. self.min = minthreads
  51. self.max = maxthreads
  52. self.name = name
  53. self.threads = []
  54. def trackingThreadFactory(*a, **kw):
  55. thread = self.threadFactory(*a, name=self._generateName(), **kw)
  56. self.threads.append(thread)
  57. return thread
  58. def currentLimit():
  59. if not self.started:
  60. return 0
  61. return self.max
  62. self._team = self._pool(currentLimit, trackingThreadFactory)
  63. @property
  64. def workers(self):
  65. """
  66. For legacy compatibility purposes, return a total number of workers.
  67. @return: the current number of workers, both idle and busy (but not
  68. those that have been quit by L{ThreadPool.adjustPoolsize})
  69. @rtype: L{int}
  70. """
  71. stats = self._team.statistics()
  72. return stats.idleWorkerCount + stats.busyWorkerCount
  73. @property
  74. def working(self):
  75. """
  76. For legacy compatibility purposes, return the number of busy workers as
  77. expressed by a list the length of that number.
  78. @return: the number of workers currently processing a work item.
  79. @rtype: L{list} of L{None}
  80. """
  81. return [None] * self._team.statistics().busyWorkerCount
  82. @property
  83. def waiters(self):
  84. """
  85. For legacy compatibility purposes, return the number of idle workers as
  86. expressed by a list the length of that number.
  87. @return: the number of workers currently alive (with an allocated
  88. thread) but waiting for new work.
  89. @rtype: L{list} of L{None}
  90. """
  91. return [None] * self._team.statistics().idleWorkerCount
  92. @property
  93. def _queue(self):
  94. """
  95. For legacy compatibility purposes, return an object with a C{qsize}
  96. method that indicates the amount of work not yet allocated to a worker.
  97. @return: an object with a C{qsize} method.
  98. """
  99. class NotAQueue(object):
  100. def qsize(q):
  101. """
  102. Pretend to be a Python threading Queue and return the
  103. number of as-yet-unconsumed tasks.
  104. @return: the amount of backlogged work not yet dispatched to a
  105. worker.
  106. @rtype: L{int}
  107. """
  108. return self._team.statistics().backloggedWorkCount
  109. return NotAQueue()
  110. q = _queue # Yes, twistedchecker, I want a single-letter
  111. # attribute name.
  112. def start(self):
  113. """
  114. Start the threadpool.
  115. """
  116. self.joined = False
  117. self.started = True
  118. # Start some threads.
  119. self.adjustPoolsize()
  120. backlog = self._team.statistics().backloggedWorkCount
  121. if backlog:
  122. self._team.grow(backlog)
  123. def startAWorker(self):
  124. """
  125. Increase the number of available workers for the thread pool by 1, up
  126. to the maximum allowed by L{ThreadPool.max}.
  127. """
  128. self._team.grow(1)
  129. def _generateName(self):
  130. """
  131. Generate a name for a new pool thread.
  132. @return: A distinctive name for the thread.
  133. @rtype: native L{str}
  134. """
  135. return "PoolThread-%s-%s" % (self.name or id(self), self.workers)
  136. def stopAWorker(self):
  137. """
  138. Decrease the number of available workers by 1, by quitting one as soon
  139. as it's idle.
  140. """
  141. self._team.shrink(1)
  142. def __setstate__(self, state):
  143. setattr(self, "__dict__", state)
  144. ThreadPool.__init__(self, self.min, self.max)
  145. def __getstate__(self):
  146. state = {}
  147. state['min'] = self.min
  148. state['max'] = self.max
  149. return state
  150. def callInThread(self, func, *args, **kw):
  151. """
  152. Call a callable object in a separate thread.
  153. @param func: callable object to be called in separate thread
  154. @param args: positional arguments to be passed to C{func}
  155. @param kw: keyword args to be passed to C{func}
  156. """
  157. self.callInThreadWithCallback(None, func, *args, **kw)
  158. def callInThreadWithCallback(self, onResult, func, *args, **kw):
  159. """
  160. Call a callable object in a separate thread and call C{onResult} with
  161. the return value, or a L{twisted.python.failure.Failure} if the
  162. callable raises an exception.
  163. The callable is allowed to block, but the C{onResult} function must not
  164. block and should perform as little work as possible.
  165. A typical action for C{onResult} for a threadpool used with a Twisted
  166. reactor would be to schedule a L{twisted.internet.defer.Deferred} to
  167. fire in the main reactor thread using C{.callFromThread}. Note that
  168. C{onResult} is called inside the separate thread, not inside the
  169. reactor thread.
  170. @param onResult: a callable with the signature C{(success, result)}.
  171. If the callable returns normally, C{onResult} is called with
  172. C{(True, result)} where C{result} is the return value of the
  173. callable. If the callable throws an exception, C{onResult} is
  174. called with C{(False, failure)}.
  175. Optionally, C{onResult} may be L{None}, in which case it is not
  176. called at all.
  177. @param func: callable object to be called in separate thread
  178. @param args: positional arguments to be passed to C{func}
  179. @param kw: keyword arguments to be passed to C{func}
  180. """
  181. if self.joined:
  182. return
  183. ctx = context.theContextTracker.currentContext().contexts[-1]
  184. def inContext():
  185. try:
  186. result = inContext.theWork()
  187. ok = True
  188. except:
  189. result = Failure()
  190. ok = False
  191. inContext.theWork = None
  192. if inContext.onResult is not None:
  193. inContext.onResult(ok, result)
  194. inContext.onResult = None
  195. elif not ok:
  196. log.err(result)
  197. # Avoid closing over func, ctx, args, kw so that we can carefully
  198. # manage their lifecycle. See
  199. # test_threadCreationArgumentsCallInThreadWithCallback.
  200. inContext.theWork = lambda: context.call(ctx, func, *args, **kw)
  201. inContext.onResult = onResult
  202. self._team.do(inContext)
  203. def stop(self):
  204. """
  205. Shutdown the threads in the threadpool.
  206. """
  207. self.joined = True
  208. self.started = False
  209. self._team.quit()
  210. for thread in self.threads:
  211. thread.join()
  212. def adjustPoolsize(self, minthreads=None, maxthreads=None):
  213. """
  214. Adjust the number of available threads by setting C{min} and C{max} to
  215. new values.
  216. @param minthreads: The new value for L{ThreadPool.min}.
  217. @param maxthreads: The new value for L{ThreadPool.max}.
  218. """
  219. if minthreads is None:
  220. minthreads = self.min
  221. if maxthreads is None:
  222. maxthreads = self.max
  223. assert minthreads >= 0, 'minimum is negative'
  224. assert minthreads <= maxthreads, 'minimum is greater than maximum'
  225. self.min = minthreads
  226. self.max = maxthreads
  227. if not self.started:
  228. return
  229. # Kill of some threads if we have too many.
  230. if self.workers > self.max:
  231. self._team.shrink(self.workers - self.max)
  232. # Start some threads if we have too few.
  233. if self.workers < self.min:
  234. self._team.grow(self.min - self.workers)
  235. def dumpStats(self):
  236. """
  237. Dump some plain-text informational messages to the log about the state
  238. of this L{ThreadPool}.
  239. """
  240. log.msg('waiters: %s' % (self.waiters,))
  241. log.msg('workers: %s' % (self.working,))
  242. log.msg('total: %s' % (self.threads,))