123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759 |
- # Copyright (c) 2009-2011 Denis Bilenko. See LICENSE for details.
- """
- Managing greenlets in a group.
- The :class:`Group` class in this module abstracts a group of running
- greenlets. When a greenlet dies, it's automatically removed from the
- group. All running greenlets in a group can be waited on with
- :meth:`Group.join`, or all running greenlets can be killed with
- :meth:`Group.kill`.
- The :class:`Pool` class, which is a subclass of :class:`Group`,
- provides a way to limit concurrency: its :meth:`spawn <Pool.spawn>`
- method blocks if the number of greenlets in the pool has already
- reached the limit, until there is a free slot.
- """
- from bisect import insort_right
- try:
- from itertools import izip
- except ImportError:
- # Python 3
- izip = zip
- from gevent.hub import GreenletExit, getcurrent, kill as _kill
- from gevent.greenlet import joinall, Greenlet
- from gevent.timeout import Timeout
- from gevent.event import Event
- from gevent.lock import Semaphore, DummySemaphore
- __all__ = ['Group', 'Pool']
- class IMapUnordered(Greenlet):
- """
- At iterator of map results.
- """
- _zipped = False
- def __init__(self, func, iterable, spawn=None, maxsize=None, _zipped=False):
- """
- An iterator that.
- :keyword int maxsize: If given and not-None, specifies the maximum number of
- finished results that will be allowed to accumulated awaiting the reader;
- more than that number of results will cause map function greenlets to begin
- to block. This is most useful is there is a great disparity in the speed of
- the mapping code and the consumer and the results consume a great deal of resources.
- Using a bound is more computationally expensive than not using a bound.
- .. versionchanged:: 1.1b3
- Added the *maxsize* parameter.
- """
- from gevent.queue import Queue
- Greenlet.__init__(self)
- if spawn is not None:
- self.spawn = spawn
- if _zipped:
- self._zipped = _zipped
- self.func = func
- self.iterable = iterable
- self.queue = Queue()
- if maxsize:
- # Bounding the queue is not enough if we want to keep from
- # accumulating objects; the result value will be around as
- # the greenlet's result, blocked on self.queue.put(), and
- # we'll go on to spawn another greenlet, which in turn can
- # create the result. So we need a semaphore to prevent a
- # greenlet from exiting while the queue is full so that we
- # don't spawn the next greenlet (assuming that self.spawn
- # is of course bounded). (Alternatively we could have the
- # greenlet itself do the insert into the pool, but that
- # takes some rework).
- #
- # Given the use of a semaphore at this level, sizing the queue becomes
- # redundant, and that lets us avoid having to use self.link() instead
- # of self.rawlink() to avoid having blocking methods called in the
- # hub greenlet.
- factory = Semaphore
- else:
- factory = DummySemaphore
- self._result_semaphore = factory(maxsize)
- self.count = 0
- self.finished = False
- # If the queue size is unbounded, then we want to call all
- # the links (_on_finish and _on_result) directly in the hub greenlet
- # for efficiency. However, if the queue is bounded, we can't do that if
- # the queue might block (because if there's no waiter the hub can switch to,
- # the queue simply raises Full). Therefore, in that case, we use
- # the safer, somewhat-slower (because it spawns a greenlet) link() methods.
- # This means that _on_finish and _on_result can be called and interleaved in any order
- # if the call to self.queue.put() blocks..
- # Note that right now we're not bounding the queue, instead using a semaphore.
- self.rawlink(self._on_finish)
- def __iter__(self):
- return self
- def next(self):
- self._result_semaphore.release()
- value = self._inext()
- if isinstance(value, Failure):
- raise value.exc
- return value
- __next__ = next
- def _inext(self):
- return self.queue.get()
- def _ispawn(self, func, item):
- self._result_semaphore.acquire()
- self.count += 1
- g = self.spawn(func, item) if not self._zipped else self.spawn(func, *item)
- g.rawlink(self._on_result)
- return g
- def _run(self): # pylint:disable=method-hidden
- try:
- func = self.func
- for item in self.iterable:
- self._ispawn(func, item)
- finally:
- self.__dict__.pop('spawn', None)
- self.__dict__.pop('func', None)
- self.__dict__.pop('iterable', None)
- def _on_result(self, greenlet):
- # This method can either be called in the hub greenlet (if the
- # queue is unbounded) or its own greenlet. If it's called in
- # its own greenlet, the calls to put() may block and switch
- # greenlets, which in turn could mutate our state. So any
- # state on this object that we need to look at, notably
- # self.count, we need to capture or mutate *before* we put.
- # (Note that right now we're not bounding the queue, but we may
- # choose to do so in the future so this implementation will be left in case.)
- self.count -= 1
- count = self.count
- finished = self.finished
- ready = self.ready()
- put_finished = False
- if ready and count <= 0 and not finished:
- finished = self.finished = True
- put_finished = True
- if greenlet.successful():
- self.queue.put(self._iqueue_value_for_success(greenlet))
- else:
- self.queue.put(self._iqueue_value_for_failure(greenlet))
- if put_finished:
- self.queue.put(self._iqueue_value_for_finished())
- def _on_finish(self, _self):
- if self.finished:
- return
- if not self.successful():
- self.finished = True
- self.queue.put(self._iqueue_value_for_self_failure())
- return
- if self.count <= 0:
- self.finished = True
- self.queue.put(self._iqueue_value_for_finished())
- def _iqueue_value_for_success(self, greenlet):
- return greenlet.value
- def _iqueue_value_for_failure(self, greenlet):
- return Failure(greenlet.exception, getattr(greenlet, '_raise_exception'))
- def _iqueue_value_for_finished(self):
- return Failure(StopIteration)
- def _iqueue_value_for_self_failure(self):
- return Failure(self.exception, self._raise_exception)
- class IMap(IMapUnordered):
- # A specialization of IMapUnordered that returns items
- # in the order in which they were generated, not
- # the order in which they finish.
- # We do this by storing tuples (order, value) in the queue
- # not just value.
- def __init__(self, *args, **kwargs):
- self.waiting = [] # QQQ maybe deque will work faster there?
- self.index = 0
- self.maxindex = -1
- IMapUnordered.__init__(self, *args, **kwargs)
- def _inext(self):
- while True:
- if self.waiting and self.waiting[0][0] <= self.index:
- _, value = self.waiting.pop(0)
- else:
- index, value = self.queue.get()
- if index > self.index:
- insort_right(self.waiting, (index, value))
- continue
- self.index += 1
- return value
- def _ispawn(self, func, item):
- g = IMapUnordered._ispawn(self, func, item)
- self.maxindex += 1
- g.index = self.maxindex
- return g
- def _iqueue_value_for_success(self, greenlet):
- return (greenlet.index, IMapUnordered._iqueue_value_for_success(self, greenlet))
- def _iqueue_value_for_failure(self, greenlet):
- return (greenlet.index, IMapUnordered._iqueue_value_for_failure(self, greenlet))
- def _iqueue_value_for_finished(self):
- self.maxindex += 1
- return (self.maxindex, IMapUnordered._iqueue_value_for_finished(self))
- def _iqueue_value_for_self_failure(self):
- self.maxindex += 1
- return (self.maxindex, IMapUnordered._iqueue_value_for_self_failure(self))
- class GroupMappingMixin(object):
- # Internal, non-public API class.
- # Provides mixin methods for implementing mapping pools. Subclasses must define:
- # - self.spawn(func, *args, **kwargs): a function that runs `func` with `args`
- # and `awargs`, potentially asynchronously. Return a value with a `get` method that
- # blocks until the results of func are available, and a `link` method.
- # - self._apply_immediately(): should the function passed to apply be called immediately,
- # synchronously?
- # - self._apply_async_use_greenlet(): Should apply_async directly call
- # Greenlet.spawn(), bypassing self.spawn? Return true when self.spawn would block
- # - self._apply_async_cb_spawn(callback, result): Run the given callback function, possiblly
- # asynchronously, possibly synchronously.
- def apply_cb(self, func, args=None, kwds=None, callback=None):
- """
- :meth:`apply` the given *func(\\*args, \\*\\*kwds)*, and, if a *callback* is given, run it with the
- results of *func* (unless an exception was raised.)
- The *callback* may be called synchronously or asynchronously. If called
- asynchronously, it will not be tracked by this group. (:class:`Group` and :class:`Pool`
- call it asynchronously in a new greenlet; :class:`~gevent.threadpool.ThreadPool` calls
- it synchronously in the current greenlet.)
- """
- result = self.apply(func, args, kwds)
- if callback is not None:
- self._apply_async_cb_spawn(callback, result)
- return result
- def apply_async(self, func, args=None, kwds=None, callback=None):
- """
- A variant of the :meth:`apply` method which returns a :class:`~.Greenlet` object.
- When the returned greenlet gets to run, it *will* call :meth:`apply`,
- passing in *func*, *args* and *kwds*.
- If *callback* is specified, then it should be a callable which
- accepts a single argument. When the result becomes ready
- callback is applied to it (unless the call failed).
- This method will never block, even if this group is full (that is,
- even if :meth:`spawn` would block, this method will not).
- .. caution:: The returned greenlet may or may not be tracked
- as part of this group, so :meth:`joining <join>` this group is
- not a reliable way to wait for the results to be available or
- for the returned greenlet to run; instead, join the returned
- greenlet.
- .. tip:: Because :class:`~.ThreadPool` objects do not track greenlets, the returned
- greenlet will never be a part of it. To reduce overhead and improve performance,
- :class:`Group` and :class:`Pool` may choose to track the returned
- greenlet. These are implementation details that may change.
- """
- if args is None:
- args = ()
- if kwds is None:
- kwds = {}
- if self._apply_async_use_greenlet():
- # cannot call self.spawn() directly because it will block
- # XXX: This is always the case for ThreadPool, but for Group/Pool
- # of greenlets, this is only the case when they are full...hence
- # the weasely language about "may or may not be tracked". Should we make
- # Group/Pool always return true as well so it's never tracked by any
- # implementation? That would simplify that logic, but could increase
- # the total number of greenlets in the system and add a layer of
- # overhead for the simple cases when the pool isn't full.
- return Greenlet.spawn(self.apply_cb, func, args, kwds, callback)
- greenlet = self.spawn(func, *args, **kwds)
- if callback is not None:
- greenlet.link(pass_value(callback))
- return greenlet
- def apply(self, func, args=None, kwds=None):
- """
- Rough quivalent of the :func:`apply()` builtin function blocking until
- the result is ready and returning it.
- The ``func`` will *usually*, but not *always*, be run in a way
- that allows the current greenlet to switch out (for example,
- in a new greenlet or thread, depending on implementation). But
- if the current greenlet or thread is already one that was
- spawned by this pool, the pool may choose to immediately run
- the `func` synchronously.
- Any exception ``func`` raises will be propagated to the caller of ``apply`` (that is,
- this method will raise the exception that ``func`` raised).
- """
- if args is None:
- args = ()
- if kwds is None:
- kwds = {}
- if self._apply_immediately():
- return func(*args, **kwds)
- return self.spawn(func, *args, **kwds).get()
- def map(self, func, iterable):
- """Return a list made by applying the *func* to each element of
- the iterable.
- .. seealso:: :meth:`imap`
- """
- return list(self.imap(func, iterable))
- def map_cb(self, func, iterable, callback=None):
- result = self.map(func, iterable)
- if callback is not None:
- callback(result)
- return result
- def map_async(self, func, iterable, callback=None):
- """
- A variant of the map() method which returns a Greenlet object that is executing
- the map function.
- If callback is specified then it should be a callable which accepts a
- single argument.
- """
- return Greenlet.spawn(self.map_cb, func, iterable, callback)
- def __imap(self, cls, func, *iterables, **kwargs):
- # Python 2 doesn't support the syntax that lets us mix varargs and
- # a named kwarg, so we have to unpack manually
- maxsize = kwargs.pop('maxsize', None)
- if kwargs:
- raise TypeError("Unsupported keyword arguments")
- return cls.spawn(func, izip(*iterables), spawn=self.spawn,
- _zipped=True, maxsize=maxsize)
- def imap(self, func, *iterables, **kwargs):
- """
- imap(func, *iterables, maxsize=None) -> iterable
- An equivalent of :func:`itertools.imap`, operating in parallel.
- The *func* is applied to each element yielded from each
- iterable in *iterables* in turn, collecting the result.
- If this object has a bound on the number of active greenlets it can
- contain (such as :class:`Pool`), then at most that number of tasks will operate
- in parallel.
- :keyword int maxsize: If given and not-None, specifies the maximum number of
- finished results that will be allowed to accumulate awaiting the reader;
- more than that number of results will cause map function greenlets to begin
- to block. This is most useful if there is a great disparity in the speed of
- the mapping code and the consumer and the results consume a great deal of resources.
- .. note:: This is separate from any bound on the number of active parallel
- tasks, though they may have some interaction (for example, limiting the
- number of parallel tasks to the smallest bound).
- .. note:: Using a bound is slightly more computationally expensive than not using a bound.
- .. tip:: The :meth:`imap_unordered` method makes much better
- use of this parameter. Some additional, unspecified,
- number of objects may be required to be kept in memory
- to maintain order by this function.
- :return: An iterable object.
- .. versionchanged:: 1.1b3
- Added the *maxsize* keyword parameter.
- .. versionchanged:: 1.1a1
- Accept multiple *iterables* to iterate in parallel.
- """
- return self.__imap(IMap, func, *iterables, **kwargs)
- def imap_unordered(self, func, *iterables, **kwargs):
- """
- imap_unordered(func, *iterables, maxsize=None) -> iterable
- The same as :meth:`imap` except that the ordering of the results
- from the returned iterator should be considered in arbitrary
- order.
- This is lighter weight than :meth:`imap` and should be preferred if order
- doesn't matter.
- .. seealso:: :meth:`imap` for more details.
- """
- return self.__imap(IMapUnordered, func, *iterables, **kwargs)
- class Group(GroupMappingMixin):
- """
- Maintain a group of greenlets that are still running, without
- limiting their number.
- Links to each item and removes it upon notification.
- Groups can be iterated to discover what greenlets they are tracking,
- they can be tested to see if they contain a greenlet, and they know the
- number (len) of greenlets they are tracking. If they are not tracking any
- greenlets, they are False in a boolean context.
- """
- #: The type of Greenlet object we will :meth:`spawn`. This can be changed
- #: on an instance or in a subclass.
- greenlet_class = Greenlet
- def __init__(self, *args):
- assert len(args) <= 1, args
- self.greenlets = set(*args)
- if args:
- for greenlet in args[0]:
- greenlet.rawlink(self._discard)
- # each item we kill we place in dying, to avoid killing the same greenlet twice
- self.dying = set()
- self._empty_event = Event()
- self._empty_event.set()
- def __repr__(self):
- return '<%s at 0x%x %s>' % (self.__class__.__name__, id(self), self.greenlets)
- def __len__(self):
- """
- Answer how many greenlets we are tracking. Note that if we are empty,
- we are False in a boolean context.
- """
- return len(self.greenlets)
- def __contains__(self, item):
- """
- Answer if we are tracking the given greenlet.
- """
- return item in self.greenlets
- def __iter__(self):
- """
- Iterate across all the greenlets we are tracking, in no particular order.
- """
- return iter(self.greenlets)
- def add(self, greenlet):
- """
- Begin tracking the greenlet.
- If this group is :meth:`full`, then this method may block
- until it is possible to track the greenlet.
- """
- try:
- rawlink = greenlet.rawlink
- except AttributeError:
- pass # non-Greenlet greenlet, like MAIN
- else:
- rawlink(self._discard)
- self.greenlets.add(greenlet)
- self._empty_event.clear()
- def _discard(self, greenlet):
- self.greenlets.discard(greenlet)
- self.dying.discard(greenlet)
- if not self.greenlets:
- self._empty_event.set()
- def discard(self, greenlet):
- """
- Stop tracking the greenlet.
- """
- self._discard(greenlet)
- try:
- unlink = greenlet.unlink
- except AttributeError:
- pass # non-Greenlet greenlet, like MAIN
- else:
- unlink(self._discard)
- def start(self, greenlet):
- """
- Start the un-started *greenlet* and add it to the collection of greenlets
- this group is monitoring.
- """
- self.add(greenlet)
- greenlet.start()
- def spawn(self, *args, **kwargs):
- """
- Begin a new greenlet with the given arguments (which are passed
- to the greenlet constructor) and add it to the collection of greenlets
- this group is monitoring.
- :return: The newly started greenlet.
- """
- greenlet = self.greenlet_class(*args, **kwargs)
- self.start(greenlet)
- return greenlet
- # def close(self):
- # """Prevents any more tasks from being submitted to the pool"""
- # self.add = RaiseException("This %s has been closed" % self.__class__.__name__)
- def join(self, timeout=None, raise_error=False):
- """
- Wait for this group to become empty *at least once*.
- If there are no greenlets in the group, returns immediately.
- .. note:: By the time the waiting code (the caller of this
- method) regains control, a greenlet may have been added to
- this group, and so this object may no longer be empty. (That
- is, ``group.join(); assert len(group) == 0`` is not
- guaranteed to hold.) This method only guarantees that the group
- reached a ``len`` of 0 at some point.
- :keyword bool raise_error: If True (*not* the default), if any
- greenlet that finished while the join was in progress raised
- an exception, that exception will be raised to the caller of
- this method. If multiple greenlets raised exceptions, which
- one gets re-raised is not determined. Only greenlets currently
- in the group when this method is called are guaranteed to
- be checked for exceptions.
- :return bool: A value indicating whether this group became empty.
- If the timeout is specified and the group did not become empty
- during that timeout, then this will be a false value. Otherwise
- it will be a true value.
- .. versionchanged:: 1.2a1
- Add the return value.
- """
- greenlets = list(self.greenlets) if raise_error else ()
- result = self._empty_event.wait(timeout=timeout)
- for greenlet in greenlets:
- if greenlet.exception is not None:
- if hasattr(greenlet, '_raise_exception'):
- greenlet._raise_exception()
- raise greenlet.exception
- return result
- def kill(self, exception=GreenletExit, block=True, timeout=None):
- """
- Kill all greenlets being tracked by this group.
- """
- timer = Timeout._start_new_or_dummy(timeout)
- try:
- while self.greenlets:
- for greenlet in list(self.greenlets):
- if greenlet in self.dying:
- continue
- try:
- kill = greenlet.kill
- except AttributeError:
- _kill(greenlet, exception)
- else:
- kill(exception, block=False)
- self.dying.add(greenlet)
- if not block:
- break
- joinall(self.greenlets)
- except Timeout as ex:
- if ex is not timer:
- raise
- finally:
- timer.cancel()
- def killone(self, greenlet, exception=GreenletExit, block=True, timeout=None):
- """
- If the given *greenlet* is running and being tracked by this group,
- kill it.
- """
- if greenlet not in self.dying and greenlet in self.greenlets:
- greenlet.kill(exception, block=False)
- self.dying.add(greenlet)
- if block:
- greenlet.join(timeout)
- def full(self):
- """
- Return a value indicating whether this group can track more greenlets.
- In this implementation, because there are no limits on the number of
- tracked greenlets, this will always return a ``False`` value.
- """
- return False
- def wait_available(self, timeout=None):
- """
- Block until it is possible to :meth:`spawn` a new greenlet.
- In this implementation, because there are no limits on the number
- of tracked greenlets, this will always return immediately.
- """
- pass
- # MappingMixin methods
- def _apply_immediately(self):
- # If apply() is called from one of our own
- # worker greenlets, don't spawn a new one---if we're full, that
- # could deadlock.
- return getcurrent() in self
- def _apply_async_cb_spawn(self, callback, result):
- Greenlet.spawn(callback, result)
- def _apply_async_use_greenlet(self):
- # cannot call self.spawn() because it will block, so
- # use a fresh, untracked greenlet that when run will
- # (indirectly) call self.spawn() for us.
- return self.full()
- class Failure(object):
- __slots__ = ['exc', '_raise_exception']
- def __init__(self, exc, raise_exception=None):
- self.exc = exc
- self._raise_exception = raise_exception
- def raise_exc(self):
- if self._raise_exception:
- self._raise_exception()
- else:
- raise self.exc
- class Pool(Group):
- def __init__(self, size=None, greenlet_class=None):
- """
- Create a new pool.
- A pool is like a group, but the maximum number of members
- is governed by the *size* parameter.
- :keyword int size: If given, this non-negative integer is the
- maximum count of active greenlets that will be allowed in
- this pool. A few values have special significance:
- * ``None`` (the default) places no limit on the number of
- greenlets. This is useful when you need to track, but not limit,
- greenlets, as with :class:`gevent.pywsgi.WSGIServer`. A :class:`Group`
- may be a more efficient way to achieve the same effect.
- * ``0`` creates a pool that can never have any active greenlets. Attempting
- to spawn in this pool will block forever. This is only useful
- if an application uses :meth:`wait_available` with a timeout and checks
- :meth:`free_count` before attempting to spawn.
- """
- if size is not None and size < 0:
- raise ValueError('size must not be negative: %r' % (size, ))
- Group.__init__(self)
- self.size = size
- if greenlet_class is not None:
- self.greenlet_class = greenlet_class
- if size is None:
- factory = DummySemaphore
- else:
- factory = Semaphore
- self._semaphore = factory(size)
- def wait_available(self, timeout=None):
- """
- Wait until it's possible to spawn a greenlet in this pool.
- :param float timeout: If given, only wait the specified number
- of seconds.
- .. warning:: If the pool was initialized with a size of 0, this
- method will block forever unless a timeout is given.
- :return: A number indicating how many new greenlets can be put into
- the pool without blocking.
- .. versionchanged:: 1.1a3
- Added the ``timeout`` parameter.
- """
- return self._semaphore.wait(timeout=timeout)
- def full(self):
- """
- Return a boolean indicating whether this pool has any room for
- members. (True if it does, False if it doesn't.)
- """
- return self.free_count() <= 0
- def free_count(self):
- """
- Return a number indicating *approximately* how many more members
- can be added to this pool.
- """
- if self.size is None:
- return 1
- return max(0, self.size - len(self))
- def add(self, greenlet):
- """
- Begin tracking the given greenlet, blocking until space is available.
- .. seealso:: :meth:`Group.add`
- """
- self._semaphore.acquire()
- try:
- Group.add(self, greenlet)
- except:
- self._semaphore.release()
- raise
- def _discard(self, greenlet):
- Group._discard(self, greenlet)
- self._semaphore.release()
- class pass_value(object):
- __slots__ = ['callback']
- def __init__(self, callback):
- self.callback = callback
- def __call__(self, source):
- if source.successful():
- self.callback(source.value)
- def __hash__(self):
- return hash(self.callback)
- def __eq__(self, other):
- return self.callback == getattr(other, 'callback', other)
- def __str__(self):
- return str(self.callback)
- def __repr__(self):
- return repr(self.callback)
- def __getattr__(self, item):
- assert item != 'callback'
- return getattr(self.callback, item)
|