threadpool.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498
  1. # Copyright (c) 2012 Denis Bilenko. See LICENSE for details.
  2. from __future__ import absolute_import
  3. import sys
  4. import os
  5. from gevent._compat import integer_types
  6. from gevent.hub import get_hub, getcurrent, sleep, _get_hub
  7. from gevent.event import AsyncResult
  8. from gevent.greenlet import Greenlet
  9. from gevent.pool import GroupMappingMixin
  10. from gevent.lock import Semaphore
  11. from gevent._threading import Lock, Queue, start_new_thread
  12. __all__ = ['ThreadPool',
  13. 'ThreadResult']
  14. class ThreadPool(GroupMappingMixin):
  15. """
  16. .. note:: The method :meth:`apply_async` will always return a new
  17. greenlet, bypassing the threadpool entirely.
  18. """
  19. def __init__(self, maxsize, hub=None):
  20. if hub is None:
  21. hub = get_hub()
  22. self.hub = hub
  23. self._maxsize = 0
  24. self.manager = None
  25. self.pid = os.getpid()
  26. self.fork_watcher = hub.loop.fork(ref=False)
  27. self._init(maxsize)
  28. def _set_maxsize(self, maxsize):
  29. if not isinstance(maxsize, integer_types):
  30. raise TypeError('maxsize must be integer: %r' % (maxsize, ))
  31. if maxsize < 0:
  32. raise ValueError('maxsize must not be negative: %r' % (maxsize, ))
  33. difference = maxsize - self._maxsize
  34. self._semaphore.counter += difference
  35. self._maxsize = maxsize
  36. self.adjust()
  37. # make sure all currently blocking spawn() start unlocking if maxsize increased
  38. self._semaphore._start_notify()
  39. def _get_maxsize(self):
  40. return self._maxsize
  41. maxsize = property(_get_maxsize, _set_maxsize)
  42. def __repr__(self):
  43. return '<%s at 0x%x %s/%s/%s>' % (self.__class__.__name__, id(self), len(self), self.size, self.maxsize)
  44. def __len__(self):
  45. # XXX just do unfinished_tasks property
  46. return self.task_queue.unfinished_tasks
  47. def _get_size(self):
  48. return self._size
  49. def _set_size(self, size):
  50. if size < 0:
  51. raise ValueError('Size of the pool cannot be negative: %r' % (size, ))
  52. if size > self._maxsize:
  53. raise ValueError('Size of the pool cannot be bigger than maxsize: %r > %r' % (size, self._maxsize))
  54. if self.manager:
  55. self.manager.kill()
  56. while self._size < size:
  57. self._add_thread()
  58. delay = 0.0001
  59. while self._size > size:
  60. while self._size - size > self.task_queue.unfinished_tasks:
  61. self.task_queue.put(None)
  62. if getcurrent() is self.hub:
  63. break
  64. sleep(delay)
  65. delay = min(delay * 2, .05)
  66. if self._size:
  67. self.fork_watcher.start(self._on_fork)
  68. else:
  69. self.fork_watcher.stop()
  70. size = property(_get_size, _set_size)
  71. def _init(self, maxsize):
  72. self._size = 0
  73. self._semaphore = Semaphore(1)
  74. self._lock = Lock()
  75. self.task_queue = Queue()
  76. self._set_maxsize(maxsize)
  77. def _on_fork(self):
  78. # fork() only leaves one thread; also screws up locks;
  79. # let's re-create locks and threads.
  80. # NOTE: See comment in gevent.hub.reinit.
  81. pid = os.getpid()
  82. if pid != self.pid:
  83. self.pid = pid
  84. # Do not mix fork() and threads; since fork() only copies one thread
  85. # all objects referenced by other threads has refcount that will never
  86. # go down to 0.
  87. self._init(self._maxsize)
  88. def join(self):
  89. """Waits until all outstanding tasks have been completed."""
  90. delay = 0.0005
  91. while self.task_queue.unfinished_tasks > 0:
  92. sleep(delay)
  93. delay = min(delay * 2, .05)
  94. def kill(self):
  95. self.size = 0
  96. def _adjust_step(self):
  97. # if there is a possibility & necessity for adding a thread, do it
  98. while self._size < self._maxsize and self.task_queue.unfinished_tasks > self._size:
  99. self._add_thread()
  100. # while the number of threads is more than maxsize, kill one
  101. # we do not check what's already in task_queue - it could be all Nones
  102. while self._size - self._maxsize > self.task_queue.unfinished_tasks:
  103. self.task_queue.put(None)
  104. if self._size:
  105. self.fork_watcher.start(self._on_fork)
  106. else:
  107. self.fork_watcher.stop()
  108. def _adjust_wait(self):
  109. delay = 0.0001
  110. while True:
  111. self._adjust_step()
  112. if self._size <= self._maxsize:
  113. return
  114. sleep(delay)
  115. delay = min(delay * 2, .05)
  116. def adjust(self):
  117. self._adjust_step()
  118. if not self.manager and self._size > self._maxsize:
  119. # might need to feed more Nones into the pool
  120. self.manager = Greenlet.spawn(self._adjust_wait)
  121. def _add_thread(self):
  122. with self._lock:
  123. self._size += 1
  124. try:
  125. start_new_thread(self._worker, ())
  126. except:
  127. with self._lock:
  128. self._size -= 1
  129. raise
  130. def spawn(self, func, *args, **kwargs):
  131. """
  132. Add a new task to the threadpool that will run ``func(*args, **kwargs)``.
  133. Waits until a slot is available. Creates a new thread if necessary.
  134. :return: A :class:`gevent.event.AsyncResult`.
  135. """
  136. while True:
  137. semaphore = self._semaphore
  138. semaphore.acquire()
  139. if semaphore is self._semaphore:
  140. break
  141. thread_result = None
  142. try:
  143. task_queue = self.task_queue
  144. result = AsyncResult()
  145. # XXX We're calling the semaphore release function in the hub, otherwise
  146. # we get LoopExit (why?). Previously it was done with a rawlink on the
  147. # AsyncResult and the comment that it is "competing for order with get(); this is not
  148. # good, just make ThreadResult release the semaphore before doing anything else"
  149. thread_result = ThreadResult(result, hub=self.hub, call_when_ready=semaphore.release)
  150. task_queue.put((func, args, kwargs, thread_result))
  151. self.adjust()
  152. except:
  153. if thread_result is not None:
  154. thread_result.destroy()
  155. semaphore.release()
  156. raise
  157. return result
  158. def _decrease_size(self):
  159. if sys is None:
  160. return
  161. _lock = getattr(self, '_lock', None)
  162. if _lock is not None:
  163. with _lock:
  164. self._size -= 1
  165. _destroy_worker_hub = False
  166. def _worker(self):
  167. # pylint:disable=too-many-branches
  168. need_decrease = True
  169. try:
  170. while True:
  171. task_queue = self.task_queue
  172. task = task_queue.get()
  173. try:
  174. if task is None:
  175. need_decrease = False
  176. self._decrease_size()
  177. # we want first to decrease size, then decrease unfinished_tasks
  178. # otherwise, _adjust might think there's one more idle thread that
  179. # needs to be killed
  180. return
  181. func, args, kwargs, thread_result = task
  182. try:
  183. value = func(*args, **kwargs)
  184. except: # pylint:disable=bare-except
  185. exc_info = getattr(sys, 'exc_info', None)
  186. if exc_info is None:
  187. return
  188. thread_result.handle_error((self, func), exc_info())
  189. else:
  190. if sys is None:
  191. return
  192. thread_result.set(value)
  193. del value
  194. finally:
  195. del func, args, kwargs, thread_result, task
  196. finally:
  197. if sys is None:
  198. return # pylint:disable=lost-exception
  199. task_queue.task_done()
  200. finally:
  201. if need_decrease:
  202. self._decrease_size()
  203. if sys is not None and self._destroy_worker_hub:
  204. hub = _get_hub()
  205. if hub is not None:
  206. hub.destroy(True)
  207. del hub
  208. def apply_e(self, expected_errors, function, args=None, kwargs=None):
  209. """
  210. .. deprecated:: 1.1a2
  211. Identical to :meth:`apply`; the ``expected_errors`` argument is ignored.
  212. """
  213. # pylint:disable=unused-argument
  214. # Deprecated but never documented. In the past, before
  215. # self.apply() allowed all errors to be raised to the caller,
  216. # expected_errors allowed a caller to specify a set of errors
  217. # they wanted to be raised, through the wrap_errors function.
  218. # In practice, it always took the value Exception or
  219. # BaseException.
  220. return self.apply(function, args, kwargs)
  221. def _apply_immediately(self):
  222. # If we're being called from a different thread than the one that
  223. # created us, e.g., because a worker task is trying to use apply()
  224. # recursively, we have no choice but to run the task immediately;
  225. # if we try to AsyncResult.get() in the worker thread, it's likely to have
  226. # nothing to switch to and lead to a LoopExit.
  227. return get_hub() is not self.hub
  228. def _apply_async_cb_spawn(self, callback, result):
  229. callback(result)
  230. def _apply_async_use_greenlet(self):
  231. # Always go to Greenlet because our self.spawn uses threads
  232. return True
  233. class ThreadResult(object):
  234. # Using slots here helps to debug reference cycles/leaks
  235. __slots__ = ('exc_info', 'async', '_call_when_ready', 'value',
  236. 'context', 'hub', 'receiver')
  237. def __init__(self, receiver, hub=None, call_when_ready=None):
  238. if hub is None:
  239. hub = get_hub()
  240. self.receiver = receiver
  241. self.hub = hub
  242. self.context = None
  243. self.value = None
  244. self.exc_info = ()
  245. self.async = hub.loop.async()
  246. self._call_when_ready = call_when_ready
  247. self.async.start(self._on_async)
  248. @property
  249. def exception(self):
  250. return self.exc_info[1] if self.exc_info else None
  251. def _on_async(self):
  252. self.async.stop()
  253. if self._call_when_ready:
  254. # Typically this is pool.semaphore.release and we have to
  255. # call this in the Hub; if we don't we get the dreaded
  256. # LoopExit (XXX: Why?)
  257. self._call_when_ready()
  258. try:
  259. if self.exc_info:
  260. self.hub.handle_error(self.context, *self.exc_info)
  261. self.context = None
  262. self.async = None
  263. self.hub = None
  264. self._call_when_ready = None
  265. if self.receiver is not None:
  266. self.receiver(self)
  267. finally:
  268. self.receiver = None
  269. self.value = None
  270. if self.exc_info:
  271. self.exc_info = (self.exc_info[0], self.exc_info[1], None)
  272. def destroy(self):
  273. if self.async is not None:
  274. self.async.stop()
  275. self.async = None
  276. self.context = None
  277. self.hub = None
  278. self._call_when_ready = None
  279. self.receiver = None
  280. def _ready(self):
  281. if self.async is not None:
  282. self.async.send()
  283. def set(self, value):
  284. self.value = value
  285. self._ready()
  286. def handle_error(self, context, exc_info):
  287. self.context = context
  288. self.exc_info = exc_info
  289. self._ready()
  290. # link protocol:
  291. def successful(self):
  292. return self.exception is None
  293. def wrap_errors(errors, function, args, kwargs):
  294. """
  295. .. deprecated:: 1.1a2
  296. Previously used by ThreadPool.apply_e.
  297. """
  298. try:
  299. return True, function(*args, **kwargs)
  300. except errors as ex:
  301. return False, ex
  302. try:
  303. import concurrent.futures
  304. except ImportError:
  305. pass
  306. else:
  307. __all__.append("ThreadPoolExecutor")
  308. from gevent.timeout import Timeout as GTimeout
  309. from gevent._util import Lazy
  310. from concurrent.futures import _base as cfb
  311. def _wrap_error(future, fn):
  312. def cbwrap(_):
  313. del _
  314. # we're called with the async result, but
  315. # be sure to pass in ourself. Also automatically
  316. # unlink ourself so that we don't get called multiple
  317. # times.
  318. try:
  319. fn(future)
  320. except Exception: # pylint: disable=broad-except
  321. future.hub.print_exception((fn, future), *sys.exc_info())
  322. cbwrap.auto_unlink = True
  323. return cbwrap
  324. def _wrap(future, fn):
  325. def f(_):
  326. fn(future)
  327. f.auto_unlink = True
  328. return f
  329. class _FutureProxy(object):
  330. def __init__(self, asyncresult):
  331. self.asyncresult = asyncresult
  332. # Internal implementation details of a c.f.Future
  333. @Lazy
  334. def _condition(self):
  335. from gevent import monkey
  336. if monkey.is_module_patched('threading') or self.done():
  337. import threading
  338. return threading.Condition()
  339. # We can only properly work with conditions
  340. # when we've been monkey-patched. This is necessary
  341. # for the wait/as_completed module functions.
  342. raise AttributeError("_condition")
  343. @Lazy
  344. def _waiters(self):
  345. self.asyncresult.rawlink(self.__when_done)
  346. return []
  347. def __when_done(self, _):
  348. # We should only be called when _waiters has
  349. # already been accessed.
  350. waiters = getattr(self, '_waiters')
  351. for w in waiters: # pylint:disable=not-an-iterable
  352. if self.successful():
  353. w.add_result(self)
  354. else:
  355. w.add_exception(self)
  356. __when_done.auto_unlink = True
  357. @property
  358. def _state(self):
  359. if self.done():
  360. return cfb.FINISHED
  361. return cfb.RUNNING
  362. def set_running_or_notify_cancel(self):
  363. # Does nothing, not even any consistency checks. It's
  364. # meant to be internal to the executor and we don't use it.
  365. return
  366. def result(self, timeout=None):
  367. try:
  368. return self.asyncresult.result(timeout=timeout)
  369. except GTimeout:
  370. # XXX: Theoretically this could be a completely
  371. # unrelated timeout instance. Do we care about that?
  372. raise concurrent.futures.TimeoutError()
  373. def exception(self, timeout=None):
  374. try:
  375. self.asyncresult.get(timeout=timeout)
  376. return self.asyncresult.exception
  377. except GTimeout:
  378. raise concurrent.futures.TimeoutError()
  379. def add_done_callback(self, fn):
  380. if self.done():
  381. fn(self)
  382. else:
  383. self.asyncresult.rawlink(_wrap_error(self, fn))
  384. def rawlink(self, fn):
  385. self.asyncresult.rawlink(_wrap(self, fn))
  386. def __str__(self):
  387. return str(self.asyncresult)
  388. def __getattr__(self, name):
  389. return getattr(self.asyncresult, name)
  390. class ThreadPoolExecutor(concurrent.futures.ThreadPoolExecutor):
  391. """
  392. A version of :class:`concurrent.futures.ThreadPoolExecutor` that
  393. always uses native threads, even when threading is monkey-patched.
  394. The ``Future`` objects returned from this object can be used
  395. with gevent waiting primitives like :func:`gevent.wait`.
  396. .. caution:: If threading is *not* monkey-patched, then the ``Future``
  397. objects returned by this object are not guaranteed to work with
  398. :func:`~concurrent.futures.as_completed` and :func:`~concurrent.futures.wait`.
  399. The individual blocking methods like :meth:`~concurrent.futures.Future.result`
  400. and :meth:`~concurrent.futures.Future.exception` will always work.
  401. .. versionadded:: 1.2a1
  402. This is a provisional API.
  403. """
  404. def __init__(self, max_workers):
  405. super(ThreadPoolExecutor, self).__init__(max_workers)
  406. self._threadpool = ThreadPool(max_workers)
  407. self._threadpool._destroy_worker_hub = True
  408. def submit(self, fn, *args, **kwargs):
  409. with self._shutdown_lock: # pylint:disable=not-context-manager
  410. if self._shutdown:
  411. raise RuntimeError('cannot schedule new futures after shutdown')
  412. future = self._threadpool.spawn(fn, *args, **kwargs)
  413. return _FutureProxy(future)
  414. def shutdown(self, wait=True):
  415. super(ThreadPoolExecutor, self).shutdown(wait)
  416. # XXX: We don't implement wait properly
  417. kill = getattr(self._threadpool, 'kill', None)
  418. if kill: # pylint:disable=using-constant-test
  419. self._threadpool.kill()
  420. self._threadpool = None
  421. kill = shutdown # greentest compat
  422. def _adjust_thread_count(self):
  423. # Does nothing. We don't want to spawn any "threads",
  424. # let the threadpool handle that.
  425. pass