pool.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759
  1. # Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
  2. """
  3. Managing greenlets in a group.
  4. The :class:`Group` class in this module abstracts a group of running
  5. greenlets. When a greenlet dies, it's automatically removed from the
  6. group. All running greenlets in a group can be waited on with
  7. :meth:`Group.join`, or all running greenlets can be killed with
  8. :meth:`Group.kill`.
  9. The :class:`Pool` class, which is a subclass of :class:`Group`,
  10. provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
  11. method blocks if the number of greenlets in the pool has already
  12. reached the limit, until there is a free slot.
  13. """
  14. from bisect import insort_right
  15. try:
  16. from itertools import izip
  17. except ImportError:
  18. # Python 3
  19. izip = zip
  20. from gevent.hub import GreenletExit, getcurrent, kill as _kill
  21. from gevent.greenlet import joinall, Greenlet
  22. from gevent.timeout import Timeout
  23. from gevent.event import Event
  24. from gevent.lock import Semaphore, DummySemaphore
  25. __all__ = ['Group', 'Pool']
  26. class IMapUnordered(Greenlet):
  27. """
  28. At iterator of map results.
  29. """
  30. _zipped = False
  31. def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
  32. """
  33. An iterator that.
  34. :keyword int maxsize: If given and not-None, specifies the maximum number of
  35. finished results that will be allowed to accumulated awaiting the reader;
  36. more than that number of results will cause map function greenlets to begin
  37. to block. This is most useful is there is a great disparity in the speed of
  38. the mapping code and the consumer and the results consume a great deal of resources.
  39. Using a bound is more computationally expensive than not using a bound.
  40. .. versionchanged:: 1.1b3
  41. Added the *maxsize* parameter.
  42. """
  43. from gevent.queue import Queue
  44. Greenlet.__init__(self)
  45. if spawn is not None:
  46. self.spawn = spawn
  47. if _zipped:
  48. self._zipped = _zipped
  49. self.func = func
  50. self.iterable = iterable
  51. self.queue = Queue()
  52. if maxsize:
  53. # Bounding the queue is not enough if we want to keep from
  54. # accumulating objects; the result value will be around as
  55. # the greenlet's result, blocked on self.queue.put(), and
  56. # we'll go on to spawn another greenlet, which in turn can
  57. # create the result. So we need a semaphore to prevent a
  58. # greenlet from exiting while the queue is full so that we
  59. # don't spawn the next greenlet (assuming that self.spawn
  60. # is of course bounded). (Alternatively we could have the
  61. # greenlet itself do the insert into the pool, but that
  62. # takes some rework).
  63. #
  64. # Given the use of a semaphore at this level, sizing the queue becomes
  65. # redundant, and that lets us avoid having to use self.link() instead
  66. # of self.rawlink() to avoid having blocking methods called in the
  67. # hub greenlet.
  68. factory = Semaphore
  69. else:
  70. factory = DummySemaphore
  71. self._result_semaphore = factory(maxsize)
  72. self.count = 0
  73. self.finished = False
  74. # If the queue size is unbounded, then we want to call all
  75. # the links (_on_finish and _on_result) directly in the hub greenlet
  76. # for efficiency. However, if the queue is bounded, we can't do that if
  77. # the queue might block (because if there's no waiter the hub can switch to,
  78. # the queue simply raises Full). Therefore, in that case, we use
  79. # the safer, somewhat-slower (because it spawns a greenlet) link() methods.
  80. # This means that _on_finish and _on_result can be called and interleaved in any order
  81. # if the call to self.queue.put() blocks..
  82. # Note that right now we're not bounding the queue, instead using a semaphore.
  83. self.rawlink(self._on_finish)
  84. def __iter__(self):
  85. return self
  86. def next(self):
  87. self._result_semaphore.release()
  88. value = self._inext()
  89. if isinstance(value, Failure):
  90. raise value.exc
  91. return value
  92. __next__ = next
  93. def _inext(self):
  94. return self.queue.get()
  95. def _ispawn(self, func, item):
  96. self._result_semaphore.acquire()
  97. self.count += 1
  98. g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
  99. g.rawlink(self._on_result)
  100. return g
  101. def _run(self): # pylint:disable=method-hidden
  102. try:
  103. func = self.func
  104. for item in self.iterable:
  105. self._ispawn(func, item)
  106. finally:
  107. self.__dict__.pop('spawn', None)
  108. self.__dict__.pop('func', None)
  109. self.__dict__.pop('iterable', None)
  110. def _on_result(self, greenlet):
  111. # This method can either be called in the hub greenlet (if the
  112. # queue is unbounded) or its own greenlet. If it's called in
  113. # its own greenlet, the calls to put() may block and switch
  114. # greenlets, which in turn could mutate our state. So any
  115. # state on this object that we need to look at, notably
  116. # self.count, we need to capture or mutate *before* we put.
  117. # (Note that right now we're not bounding the queue, but we may
  118. # choose to do so in the future so this implementation will be left in case.)
  119. self.count -= 1
  120. count = self.count
  121. finished = self.finished
  122. ready = self.ready()
  123. put_finished = False
  124. if ready and count <= 0 and not finished:
  125. finished = self.finished = True
  126. put_finished = True
  127. if greenlet.successful():
  128. self.queue.put(self._iqueue_value_for_success(greenlet))
  129. else:
  130. self.queue.put(self._iqueue_value_for_failure(greenlet))
  131. if put_finished:
  132. self.queue.put(self._iqueue_value_for_finished())
  133. def _on_finish(self, _self):
  134. if self.finished:
  135. return
  136. if not self.successful():
  137. self.finished = True
  138. self.queue.put(self._iqueue_value_for_self_failure())
  139. return
  140. if self.count <= 0:
  141. self.finished = True
  142. self.queue.put(self._iqueue_value_for_finished())
  143. def _iqueue_value_for_success(self, greenlet):
  144. return greenlet.value
  145. def _iqueue_value_for_failure(self, greenlet):
  146. return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
  147. def _iqueue_value_for_finished(self):
  148. return Failure(StopIteration)
  149. def _iqueue_value_for_self_failure(self):
  150. return Failure(self.exception, self._raise_exception)
  151. class IMap(IMapUnordered):
  152. # A specialization of IMapUnordered that returns items
  153. # in the order in which they were generated, not
  154. # the order in which they finish.
  155. # We do this by storing tuples (order, value) in the queue
  156. # not just value.
  157. def __init__(self, *args, **kwargs):
  158. self.waiting = [] # QQQ maybe deque will work faster there?
  159. self.index = 0
  160. self.maxindex = -1
  161. IMapUnordered.__init__(self, *args, **kwargs)
  162. def _inext(self):
  163. while True:
  164. if self.waiting and self.waiting[0][0] <= self.index:
  165. _, value = self.waiting.pop(0)
  166. else:
  167. index, value = self.queue.get()
  168. if index > self.index:
  169. insort_right(self.waiting, (index, value))
  170. continue
  171. self.index += 1
  172. return value
  173. def _ispawn(self, func, item):
  174. g = IMapUnordered._ispawn(self, func, item)
  175. self.maxindex += 1
  176. g.index = self.maxindex
  177. return g
  178. def _iqueue_value_for_success(self, greenlet):
  179. return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
  180. def _iqueue_value_for_failure(self, greenlet):
  181. return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
  182. def _iqueue_value_for_finished(self):
  183. self.maxindex += 1
  184. return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
  185. def _iqueue_value_for_self_failure(self):
  186. self.maxindex += 1
  187. return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
  188. class GroupMappingMixin(object):
  189. # Internal, non-public API class.
  190. # Provides mixin methods for implementing mapping pools. Subclasses must define:
  191. # - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
  192. # and `awargs`, potentially asynchronously. Return a value with a `get` method that
  193. # blocks until the results of func are available, and a `link` method.
  194. # - self._apply_immediately(): should the function passed to apply be called immediately,
  195. # synchronously?
  196. # - self._apply_async_use_greenlet(): Should apply_async directly call
  197. # Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
  198. # - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
  199. # asynchronously, possibly synchronously.
  200. def apply_cb(self, func, args=None, kwds=None, callback=None):
  201. """
  202. :meth:`apply` the given *func(\\*args, \\*\\*kwds)*, and, if a *callback* is given, run it with the
  203. results of *func* (unless an exception was raised.)
  204. The *callback* may be called synchronously or asynchronously. If called
  205. asynchronously, it will not be tracked by this group. (:class:`Group` and :class:`Pool`
  206. call it asynchronously in a new greenlet; :class:`~gevent.threadpool.ThreadPool` calls
  207. it synchronously in the current greenlet.)
  208. """
  209. result = self.apply(func, args, kwds)
  210. if callback is not None:
  211. self._apply_async_cb_spawn(callback, result)
  212. return result
  213. def apply_async(self, func, args=None, kwds=None, callback=None):
  214. """
  215. A variant of the :meth:`apply` method which returns a :class:`~.Greenlet` object.
  216. When the returned greenlet gets to run, it *will* call :meth:`apply`,
  217. passing in *func*, *args* and *kwds*.
  218. If *callback* is specified, then it should be a callable which
  219. accepts a single argument. When the result becomes ready
  220. callback is applied to it (unless the call failed).
  221. This method will never block, even if this group is full (that is,
  222. even if :meth:`spawn` would block, this method will not).
  223. .. caution:: The returned greenlet may or may not be tracked
  224. as part of this group, so :meth:`joining <join>` this group is
  225. not a reliable way to wait for the results to be available or
  226. for the returned greenlet to run; instead, join the returned
  227. greenlet.
  228. .. tip:: Because :class:`~.ThreadPool` objects do not track greenlets, the returned
  229. greenlet will never be a part of it. To reduce overhead and improve performance,
  230. :class:`Group` and :class:`Pool` may choose to track the returned
  231. greenlet. These are implementation details that may change.
  232. """
  233. if args is None:
  234. args = ()
  235. if kwds is None:
  236. kwds = {}
  237. if self._apply_async_use_greenlet():
  238. # cannot call self.spawn() directly because it will block
  239. # XXX: This is always the case for ThreadPool, but for Group/Pool
  240. # of greenlets, this is only the case when they are full...hence
  241. # the weasely language about "may or may not be tracked". Should we make
  242. # Group/Pool always return true as well so it's never tracked by any
  243. # implementation? That would simplify that logic, but could increase
  244. # the total number of greenlets in the system and add a layer of
  245. # overhead for the simple cases when the pool isn't full.
  246. return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
  247. greenlet = self.spawn(func, *args, **kwds)
  248. if callback is not None:
  249. greenlet.link(pass_value(callback))
  250. return greenlet
  251. def apply(self, func, args=None, kwds=None):
  252. """
  253. Rough quivalent of the :func:`apply()` builtin function blocking until
  254. the result is ready and returning it.
  255. The ``func`` will *usually*, but not *always*, be run in a way
  256. that allows the current greenlet to switch out (for example,
  257. in a new greenlet or thread, depending on implementation). But
  258. if the current greenlet or thread is already one that was
  259. spawned by this pool, the pool may choose to immediately run
  260. the `func` synchronously.
  261. Any exception ``func`` raises will be propagated to the caller of ``apply`` (that is,
  262. this method will raise the exception that ``func`` raised).
  263. """
  264. if args is None:
  265. args = ()
  266. if kwds is None:
  267. kwds = {}
  268. if self._apply_immediately():
  269. return func(*args, **kwds)
  270. return self.spawn(func, *args, **kwds).get()
  271. def map(self, func, iterable):
  272. """Return a list made by applying the *func* to each element of
  273. the iterable.
  274. .. seealso:: :meth:`imap`
  275. """
  276. return list(self.imap(func, iterable))
  277. def map_cb(self, func, iterable, callback=None):
  278. result = self.map(func, iterable)
  279. if callback is not None:
  280. callback(result)
  281. return result
  282. def map_async(self, func, iterable, callback=None):
  283. """
  284. A variant of the map() method which returns a Greenlet object that is executing
  285. the map function.
  286. If callback is specified then it should be a callable which accepts a
  287. single argument.
  288. """
  289. return Greenlet.spawn(self.map_cb, func, iterable, callback)
  290. def __imap(self, cls, func, *iterables, **kwargs):
  291. # Python 2 doesn't support the syntax that lets us mix varargs and
  292. # a named kwarg, so we have to unpack manually
  293. maxsize = kwargs.pop('maxsize', None)
  294. if kwargs:
  295. raise TypeError("Unsupported keyword arguments")
  296. return cls.spawn(func, izip(*iterables), spawn=self.spawn,
  297. _zipped=True, maxsize=maxsize)
  298. def imap(self, func, *iterables, **kwargs):
  299. """
  300. imap(func, *iterables, maxsize=None) -> iterable
  301. An equivalent of :func:`itertools.imap`, operating in parallel.
  302. The *func* is applied to each element yielded from each
  303. iterable in *iterables* in turn, collecting the result.
  304. If this object has a bound on the number of active greenlets it can
  305. contain (such as :class:`Pool`), then at most that number of tasks will operate
  306. in parallel.
  307. :keyword int maxsize: If given and not-None, specifies the maximum number of
  308. finished results that will be allowed to accumulate awaiting the reader;
  309. more than that number of results will cause map function greenlets to begin
  310. to block. This is most useful if there is a great disparity in the speed of
  311. the mapping code and the consumer and the results consume a great deal of resources.
  312. .. note:: This is separate from any bound on the number of active parallel
  313. tasks, though they may have some interaction (for example, limiting the
  314. number of parallel tasks to the smallest bound).
  315. .. note:: Using a bound is slightly more computationally expensive than not using a bound.
  316. .. tip:: The :meth:`imap_unordered` method makes much better
  317. use of this parameter. Some additional, unspecified,
  318. number of objects may be required to be kept in memory
  319. to maintain order by this function.
  320. :return: An iterable object.
  321. .. versionchanged:: 1.1b3
  322. Added the *maxsize* keyword parameter.
  323. .. versionchanged:: 1.1a1
  324. Accept multiple *iterables* to iterate in parallel.
  325. """
  326. return self.__imap(IMap, func, *iterables, **kwargs)
  327. def imap_unordered(self, func, *iterables, **kwargs):
  328. """
  329. imap_unordered(func, *iterables, maxsize=None) -> iterable
  330. The same as :meth:`imap` except that the ordering of the results
  331. from the returned iterator should be considered in arbitrary
  332. order.
  333. This is lighter weight than :meth:`imap` and should be preferred if order
  334. doesn't matter.
  335. .. seealso:: :meth:`imap` for more details.
  336. """
  337. return self.__imap(IMapUnordered, func, *iterables, **kwargs)
  338. class Group(GroupMappingMixin):
  339. """
  340. Maintain a group of greenlets that are still running, without
  341. limiting their number.
  342. Links to each item and removes it upon notification.
  343. Groups can be iterated to discover what greenlets they are tracking,
  344. they can be tested to see if they contain a greenlet, and they know the
  345. number (len) of greenlets they are tracking. If they are not tracking any
  346. greenlets, they are False in a boolean context.
  347. """
  348. #: The type of Greenlet object we will :meth:`spawn`. This can be changed
  349. #: on an instance or in a subclass.
  350. greenlet_class = Greenlet
  351. def __init__(self, *args):
  352. assert len(args) <= 1, args
  353. self.greenlets = set(*args)
  354. if args:
  355. for greenlet in args[0]:
  356. greenlet.rawlink(self._discard)
  357. # each item we kill we place in dying, to avoid killing the same greenlet twice
  358. self.dying = set()
  359. self._empty_event = Event()
  360. self._empty_event.set()
  361. def __repr__(self):
  362. return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self.greenlets)
  363. def __len__(self):
  364. """
  365. Answer how many greenlets we are tracking. Note that if we are empty,
  366. we are False in a boolean context.
  367. """
  368. return len(self.greenlets)
  369. def __contains__(self, item):
  370. """
  371. Answer if we are tracking the given greenlet.
  372. """
  373. return item in self.greenlets
  374. def __iter__(self):
  375. """
  376. Iterate across all the greenlets we are tracking, in no particular order.
  377. """
  378. return iter(self.greenlets)
  379. def add(self, greenlet):
  380. """
  381. Begin tracking the greenlet.
  382. If this group is :meth:`full`, then this method may block
  383. until it is possible to track the greenlet.
  384. """
  385. try:
  386. rawlink = greenlet.rawlink
  387. except AttributeError:
  388. pass # non-Greenlet greenlet, like MAIN
  389. else:
  390. rawlink(self._discard)
  391. self.greenlets.add(greenlet)
  392. self._empty_event.clear()
  393. def _discard(self, greenlet):
  394. self.greenlets.discard(greenlet)
  395. self.dying.discard(greenlet)
  396. if not self.greenlets:
  397. self._empty_event.set()
  398. def discard(self, greenlet):
  399. """
  400. Stop tracking the greenlet.
  401. """
  402. self._discard(greenlet)
  403. try:
  404. unlink = greenlet.unlink
  405. except AttributeError:
  406. pass # non-Greenlet greenlet, like MAIN
  407. else:
  408. unlink(self._discard)
  409. def start(self, greenlet):
  410. """
  411. Start the un-started *greenlet* and add it to the collection of greenlets
  412. this group is monitoring.
  413. """
  414. self.add(greenlet)
  415. greenlet.start()
  416. def spawn(self, *args, **kwargs):
  417. """
  418. Begin a new greenlet with the given arguments (which are passed
  419. to the greenlet constructor) and add it to the collection of greenlets
  420. this group is monitoring.
  421. :return: The newly started greenlet.
  422. """
  423. greenlet = self.greenlet_class(*args, **kwargs)
  424. self.start(greenlet)
  425. return greenlet
  426. # def close(self):
  427. # """Prevents any more tasks from being submitted to the pool"""
  428. # self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
  429. def join(self, timeout=None, raise_error=False):
  430. """
  431. Wait for this group to become empty *at least once*.
  432. If there are no greenlets in the group, returns immediately.
  433. .. note:: By the time the waiting code (the caller of this
  434. method) regains control, a greenlet may have been added to
  435. this group, and so this object may no longer be empty. (That
  436. is, ``group.join(); assert len(group) == 0`` is not
  437. guaranteed to hold.) This method only guarantees that the group
  438. reached a ``len`` of 0 at some point.
  439. :keyword bool raise_error: If True (*not* the default), if any
  440. greenlet that finished while the join was in progress raised
  441. an exception, that exception will be raised to the caller of
  442. this method. If multiple greenlets raised exceptions, which
  443. one gets re-raised is not determined. Only greenlets currently
  444. in the group when this method is called are guaranteed to
  445. be checked for exceptions.
  446. :return bool: A value indicating whether this group became empty.
  447. If the timeout is specified and the group did not become empty
  448. during that timeout, then this will be a false value. Otherwise
  449. it will be a true value.
  450. .. versionchanged:: 1.2a1
  451. Add the return value.
  452. """
  453. greenlets = list(self.greenlets) if raise_error else ()
  454. result = self._empty_event.wait(timeout=timeout)
  455. for greenlet in greenlets:
  456. if greenlet.exception is not None:
  457. if hasattr(greenlet, '_raise_exception'):
  458. greenlet._raise_exception()
  459. raise greenlet.exception
  460. return result
  461. def kill(self, exception=GreenletExit, block=True, timeout=None):
  462. """
  463. Kill all greenlets being tracked by this group.
  464. """
  465. timer = Timeout._start_new_or_dummy(timeout)
  466. try:
  467. while self.greenlets:
  468. for greenlet in list(self.greenlets):
  469. if greenlet in self.dying:
  470. continue
  471. try:
  472. kill = greenlet.kill
  473. except AttributeError:
  474. _kill(greenlet, exception)
  475. else:
  476. kill(exception, block=False)
  477. self.dying.add(greenlet)
  478. if not block:
  479. break
  480. joinall(self.greenlets)
  481. except Timeout as ex:
  482. if ex is not timer:
  483. raise
  484. finally:
  485. timer.cancel()
  486. def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
  487. """
  488. If the given *greenlet* is running and being tracked by this group,
  489. kill it.
  490. """
  491. if greenlet not in self.dying and greenlet in self.greenlets:
  492. greenlet.kill(exception, block=False)
  493. self.dying.add(greenlet)
  494. if block:
  495. greenlet.join(timeout)
  496. def full(self):
  497. """
  498. Return a value indicating whether this group can track more greenlets.
  499. In this implementation, because there are no limits on the number of
  500. tracked greenlets, this will always return a ``False`` value.
  501. """
  502. return False
  503. def wait_available(self, timeout=None):
  504. """
  505. Block until it is possible to :meth:`spawn` a new greenlet.
  506. In this implementation, because there are no limits on the number
  507. of tracked greenlets, this will always return immediately.
  508. """
  509. pass
  510. # MappingMixin methods
  511. def _apply_immediately(self):
  512. # If apply() is called from one of our own
  513. # worker greenlets, don't spawn a new one---if we're full, that
  514. # could deadlock.
  515. return getcurrent() in self
  516. def _apply_async_cb_spawn(self, callback, result):
  517. Greenlet.spawn(callback, result)
  518. def _apply_async_use_greenlet(self):
  519. # cannot call self.spawn() because it will block, so
  520. # use a fresh, untracked greenlet that when run will
  521. # (indirectly) call self.spawn() for us.
  522. return self.full()
  523. class Failure(object):
  524. __slots__ = ['exc', '_raise_exception']
  525. def __init__(self, exc, raise_exception=None):
  526. self.exc = exc
  527. self._raise_exception = raise_exception
  528. def raise_exc(self):
  529. if self._raise_exception:
  530. self._raise_exception()
  531. else:
  532. raise self.exc
  533. class Pool(Group):
  534. def __init__(self, size=None, greenlet_class=None):
  535. """
  536. Create a new pool.
  537. A pool is like a group, but the maximum number of members
  538. is governed by the *size* parameter.
  539. :keyword int size: If given, this non-negative integer is the
  540. maximum count of active greenlets that will be allowed in
  541. this pool. A few values have special significance:
  542. * ``None`` (the default) places no limit on the number of
  543. greenlets. This is useful when you need to track, but not limit,
  544. greenlets, as with :class:`gevent.pywsgi.WSGIServer`. A :class:`Group`
  545. may be a more efficient way to achieve the same effect.
  546. * ``0`` creates a pool that can never have any active greenlets. Attempting
  547. to spawn in this pool will block forever. This is only useful
  548. if an application uses :meth:`wait_available` with a timeout and checks
  549. :meth:`free_count` before attempting to spawn.
  550. """
  551. if size is not None and size < 0:
  552. raise ValueError('size must not be negative: %r' % (size, ))
  553. Group.__init__(self)
  554. self.size = size
  555. if greenlet_class is not None:
  556. self.greenlet_class = greenlet_class
  557. if size is None:
  558. factory = DummySemaphore
  559. else:
  560. factory = Semaphore
  561. self._semaphore = factory(size)
  562. def wait_available(self, timeout=None):
  563. """
  564. Wait until it's possible to spawn a greenlet in this pool.
  565. :param float timeout: If given, only wait the specified number
  566. of seconds.
  567. .. warning:: If the pool was initialized with a size of 0, this
  568. method will block forever unless a timeout is given.
  569. :return: A number indicating how many new greenlets can be put into
  570. the pool without blocking.
  571. .. versionchanged:: 1.1a3
  572. Added the ``timeout`` parameter.
  573. """
  574. return self._semaphore.wait(timeout=timeout)
  575. def full(self):
  576. """
  577. Return a boolean indicating whether this pool has any room for
  578. members. (True if it does, False if it doesn't.)
  579. """
  580. return self.free_count() <= 0
  581. def free_count(self):
  582. """
  583. Return a number indicating *approximately* how many more members
  584. can be added to this pool.
  585. """
  586. if self.size is None:
  587. return 1
  588. return max(0, self.size - len(self))
  589. def add(self, greenlet):
  590. """
  591. Begin tracking the given greenlet, blocking until space is available.
  592. .. seealso:: :meth:`Group.add`
  593. """
  594. self._semaphore.acquire()
  595. try:
  596. Group.add(self, greenlet)
  597. except:
  598. self._semaphore.release()
  599. raise
  600. def _discard(self, greenlet):
  601. Group._discard(self, greenlet)
  602. self._semaphore.release()
  603. class pass_value(object):
  604. __slots__ = ['callback']
  605. def __init__(self, callback):
  606. self.callback = callback
  607. def __call__(self, source):
  608. if source.successful():
  609. self.callback(source.value)
  610. def __hash__(self):
  611. return hash(self.callback)
  612. def __eq__(self, other):
  613. return self.callback == getattr(other, 'callback', other)
  614. def __str__(self):
  615. return str(self.callback)
  616. def __repr__(self):
  617. return repr(self.callback)
  618. def __getattr__(self, item):
  619. assert item != 'callback'
  620. return getattr(self.callback, item)