12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052 |
- # Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
- """
- Event-loop hub.
- """
- from __future__ import absolute_import
- # XXX: FIXME: Refactor to make this smaller
- # pylint:disable=too-many-lines
- from functools import partial as _functools_partial
- import os
- import sys
- import traceback
- from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
- __all__ = [
- 'getcurrent',
- 'GreenletExit',
- 'spawn_raw',
- 'sleep',
- 'kill',
- 'signal',
- 'reinit',
- 'get_hub',
- 'Hub',
- 'Waiter',
- ]
- from gevent._compat import string_types
- from gevent._compat import xrange
- from gevent._util import _NONE
- from gevent._util import readproperty
- if sys.version_info[0] <= 2:
- import thread # pylint:disable=import-error
- else:
- import _thread as thread # python 2 pylint:disable=import-error
- # These must be the "real" native thread versions,
- # not monkey-patched.
- threadlocal = thread._local
- class _threadlocal(threadlocal):
- def __init__(self):
- # Use a class with an initializer so that we can test
- # for 'is None' instead of catching AttributeError, making
- # the code cleaner and possibly solving some corner cases
- # (like #687)
- threadlocal.__init__(self)
- self.Hub = None
- self.loop = None
- self.hub = None
- _threadlocal = _threadlocal()
- get_ident = thread.get_ident
- MAIN_THREAD = get_ident()
- class LoopExit(Exception):
- """
- Exception thrown when the hub finishes running.
- In a normal application, this is never thrown or caught
- explicitly. The internal implementation of functions like
- :func:`join` and :func:`joinall` may catch it, but user code
- generally should not.
- .. caution::
- Errors in application programming can also lead to this exception being
- raised. Some examples include (but are not limited too):
- - greenlets deadlocking on a lock;
- - using a socket or other gevent object with native thread
- affinity from a different thread
- """
- pass
- class BlockingSwitchOutError(AssertionError):
- pass
- class InvalidSwitchError(AssertionError):
- pass
- class ConcurrentObjectUseError(AssertionError):
- # raised when an object is used (waited on) by two greenlets
- # independently, meaning the object was entered into a blocking
- # state by one greenlet and then another while still blocking in the
- # first one
- pass
- def spawn_raw(function, *args, **kwargs):
- """
- Create a new :class:`greenlet.greenlet` object and schedule it to
- run ``function(*args, **kwargs)``.
- This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
- methods that :class:`gevent.Greenlet` has. Typically, applications
- should prefer :func:`~gevent.spawn`, but this method may
- occasionally be useful as an optimization if there are many
- greenlets involved.
- .. versionchanged:: 1.1b1
- If *function* is not callable, immediately raise a :exc:`TypeError`
- instead of spawning a greenlet that will raise an uncaught TypeError.
- .. versionchanged:: 1.1rc2
- Accept keyword arguments for ``function`` as previously (incorrectly)
- documented. Note that this may incur an additional expense.
- .. versionchanged:: 1.1a3
- Verify that ``function`` is callable, raising a TypeError if not. Previously,
- the spawned greenlet would have failed the first time it was switched to.
- """
- if not callable(function):
- raise TypeError("function must be callable")
- hub = get_hub()
- # The callback class object that we use to run this doesn't
- # accept kwargs (and those objects are heavily used, as well as being
- # implemented twice in core.ppyx and corecffi.py) so do it with a partial
- if kwargs:
- function = _functools_partial(function, *args, **kwargs)
- g = RawGreenlet(function, hub)
- hub.loop.run_callback(g.switch)
- else:
- g = RawGreenlet(function, hub)
- hub.loop.run_callback(g.switch, *args)
- return g
- def sleep(seconds=0, ref=True):
- """
- Put the current greenlet to sleep for at least *seconds*.
- *seconds* may be specified as an integer, or a float if fractional
- seconds are desired.
- .. tip:: In the current implementation, a value of 0 (the default)
- means to yield execution to any other runnable greenlets, but
- this greenlet may be scheduled again before the event loop
- cycles (in an extreme case, a greenlet that repeatedly sleeps
- with 0 can prevent greenlets that are ready to do I/O from
- being scheduled for some (small) period of time); a value greater than
- 0, on the other hand, will delay running this greenlet until
- the next iteration of the loop.
- If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
- from exiting.
- .. seealso:: :func:`idle`
- """
- hub = get_hub()
- loop = hub.loop
- if seconds <= 0:
- waiter = Waiter()
- loop.run_callback(waiter.switch)
- waiter.get()
- else:
- hub.wait(loop.timer(seconds, ref=ref))
- def idle(priority=0):
- """
- Cause the calling greenlet to wait until the event loop is idle.
- Idle is defined as having no other events of the same or higher
- *priority* pending. That is, as long as sockets, timeouts or even
- signals of the same or higher priority are being processed, the loop
- is not idle.
- .. seealso:: :func:`sleep`
- """
- hub = get_hub()
- watcher = hub.loop.idle()
- if priority:
- watcher.priority = priority
- hub.wait(watcher)
- def kill(greenlet, exception=GreenletExit):
- """
- Kill greenlet asynchronously. The current greenlet is not unscheduled.
- .. note::
- The method :meth:`Greenlet.kill` method does the same and
- more (and the same caveats listed there apply here). However, the MAIN
- greenlet - the one that exists initially - does not have a
- ``kill()`` method, and neither do any created with :func:`spawn_raw`,
- so you have to use this function.
- .. caution:: Use care when killing greenlets. If they are not prepared for
- exceptions, this could result in corrupted state.
- .. versionchanged:: 1.1a2
- If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
- greenlet from being switched to for the first time after it's been
- killed but not yet executed.
- """
- if not greenlet.dead:
- if hasattr(greenlet, 'kill'):
- # dealing with gevent.greenlet.Greenlet. Use it, especially
- # to avoid allowing one to be switched to for the first time
- # after it's been killed
- greenlet.kill(exception=exception, block=False)
- else:
- get_hub().loop.run_callback(greenlet.throw, exception)
- class signal(object):
- """
- Call the *handler* with the *args* and *kwargs* when the process
- receives the signal *signalnum*.
- The *handler* will be run in a new greenlet when the signal is delivered.
- This returns an object with the useful method ``cancel``, which, when called,
- will prevent future deliveries of *signalnum* from calling *handler*.
- .. note::
- This may not operate correctly with SIGCHLD if libev child watchers
- are used (as they are by default with os.fork).
- .. versionchanged:: 1.2a1
- The ``handler`` argument is required to be callable at construction time.
- """
- # XXX: This is manually documented in gevent.rst while it is aliased in
- # the gevent module.
- greenlet_class = None
- def __init__(self, signalnum, handler, *args, **kwargs):
- if not callable(handler):
- raise TypeError("signal handler must be callable.")
- self.hub = get_hub()
- self.watcher = self.hub.loop.signal(signalnum, ref=False)
- self.watcher.start(self._start)
- self.handler = handler
- self.args = args
- self.kwargs = kwargs
- if self.greenlet_class is None:
- from gevent import Greenlet
- self.greenlet_class = Greenlet
- def _get_ref(self):
- return self.watcher.ref
- def _set_ref(self, value):
- self.watcher.ref = value
- ref = property(_get_ref, _set_ref)
- del _get_ref, _set_ref
- def cancel(self):
- self.watcher.stop()
- def _start(self):
- try:
- greenlet = self.greenlet_class(self.handle)
- greenlet.switch()
- except: # pylint:disable=bare-except
- self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member
- def handle(self):
- try:
- self.handler(*self.args, **self.kwargs)
- except: # pylint:disable=bare-except
- self.hub.handle_error(None, *sys.exc_info())
- def reinit():
- """
- Prepare the gevent hub to run in a new (forked) process.
- This should be called *immediately* after :func:`os.fork` in the
- child process. This is done automatically by
- :func:`gevent.os.fork` or if the :mod:`os` module has been
- monkey-patched. If this function is not called in a forked
- process, symptoms may include hanging of functions like
- :func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
- to work.
- .. note:: Registered fork watchers may or may not run before
- this function (and thus ``gevent.os.fork``) return. If they have
- not run, they will run "soon", after an iteration of the event loop.
- You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
- after fork returns. (As of gevent 1.1 and before, fork watchers will
- not have run, but this may change in the future.)
- .. note:: This function may be removed in a future major release
- if the fork process can be more smoothly managed.
- .. warning:: See remarks in :func:`gevent.os.fork` about greenlets
- and libev watchers in the child process.
- """
- # The loop reinit function in turn calls libev's ev_loop_fork
- # function.
- hub = _get_hub()
- if hub is not None:
- # Note that we reinit the existing loop, not destroy it.
- # See https://github.com/gevent/gevent/issues/200.
- hub.loop.reinit()
- # libev's fork watchers are slow to fire because the only fire
- # at the beginning of a loop; due to our use of callbacks that
- # run at the end of the loop, that may be too late. The
- # threadpool and resolvers depend on the fork handlers being
- # run (specifically, the threadpool will fail in the forked
- # child if there were any threads in it, which there will be
- # if the resolver_thread was in use (the default) before the
- # fork.)
- #
- # If the forked process wants to use the threadpool or
- # resolver immediately (in a queued callback), it would hang.
- #
- # The below is a workaround. Fortunately, both of these
- # methods are idempotent and can be called multiple times
- # following a fork if the suddenly started working, or were
- # already working on some platforms. Other threadpools and fork handlers
- # will be called at an arbitrary time later ('soon')
- if hasattr(hub.threadpool, '_on_fork'):
- hub.threadpool._on_fork()
- # resolver_ares also has a fork watcher that's not firing
- if hasattr(hub.resolver, '_on_fork'):
- hub.resolver._on_fork()
- # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
- # pass around before returning to this greenlet. That will allow any
- # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
- # we do this, certain tests that heavily mix threads and forking,
- # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
- # why.
- #sleep(0.00001)
- #sleep(0.00001)
- def get_hub_class():
- """Return the type of hub to use for the current thread.
- If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
- """
- hubtype = _threadlocal.Hub
- if hubtype is None:
- hubtype = _threadlocal.Hub = Hub
- return hubtype
- def get_hub(*args, **kwargs):
- """
- Return the hub for the current thread.
- If a hub does not exist in the current thread, a new one is
- created of the type returned by :func:`get_hub_class`.
- """
- hub = _threadlocal.hub
- if hub is None:
- hubtype = get_hub_class()
- hub = _threadlocal.hub = hubtype(*args, **kwargs)
- return hub
- def _get_hub():
- """Return the hub for the current thread.
- Return ``None`` if no hub has been created yet.
- """
- return _threadlocal.hub
- def set_hub(hub):
- _threadlocal.hub = hub
- def _import(path):
- # pylint:disable=too-many-branches
- if isinstance(path, list):
- if not path:
- raise ImportError('Cannot import from empty list: %r' % (path, ))
- for item in path[:-1]:
- try:
- return _import(item)
- except ImportError:
- pass
- return _import(path[-1])
- if not isinstance(path, string_types):
- return path
- if '.' not in path:
- raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)
- if '/' in path:
- package_path, path = path.rsplit('/', 1)
- sys.path = [package_path] + sys.path
- else:
- package_path = None
- try:
- module, item = path.rsplit('.', 1)
- x = __import__(module)
- for attr in path.split('.')[1:]:
- oldx = x
- x = getattr(x, attr, _NONE)
- if x is _NONE:
- raise ImportError('Cannot import %r from %r' % (attr, oldx))
- return x
- finally:
- try:
- sys.path.remove(package_path)
- except ValueError:
- pass
- def config(default, envvar):
- result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
- if isinstance(result, string_types):
- return result.split(',')
- return result
- def resolver_config(default, envvar):
- result = config(default, envvar)
- return [_resolvers.get(x, x) for x in result]
- _resolvers = {'ares': 'gevent.resolver_ares.Resolver',
- 'thread': 'gevent.resolver_thread.Resolver',
- 'block': 'gevent.socket.BlockingResolver'}
- _DEFAULT_LOOP_CLASS = 'gevent.core.loop'
- class Hub(RawGreenlet):
- """A greenlet that runs the event loop.
- It is created automatically by :func:`get_hub`.
- **Switching**
- Every time this greenlet (i.e., the event loop) is switched *to*, if
- the current greenlet has a ``switch_out`` method, it will be called. This
- allows a greenlet to take some cleanup actions before yielding control. This method
- should not call any gevent blocking functions.
- """
- #: If instances of these classes are raised into the event loop,
- #: they will be propagated out to the main greenlet (where they will
- #: usually be caught by Python itself)
- SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
- #: Instances of these classes are not considered to be errors and
- #: do not get logged/printed when raised by the event loop.
- NOT_ERROR = (GreenletExit, SystemExit)
- loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
- # For the standard class, go ahead and import it when this class
- # is defined. This is no loss of generality because the envvar is
- # only read when this class is defined, and we know that the
- # standard class will be available. This can solve problems with
- # the class being imported from multiple threads at once, leading
- # to one of the imports failing. Only do this for the object we
- # need in the constructor, as the rest of the factories are
- # themselves handled lazily. See #687. (People using a custom loop_class
- # can probably manage to get_hub() from the main thread or otherwise import
- # that loop_class themselves.)
- if loop_class == [_DEFAULT_LOOP_CLASS]:
- loop_class = [_import(loop_class)]
- resolver_class = ['gevent.resolver_thread.Resolver',
- 'gevent.resolver_ares.Resolver',
- 'gevent.socket.BlockingResolver']
- #: The class or callable object, or the name of a factory function or class,
- #: that will be used to create :attr:`resolver`. By default, configured according to
- #: :doc:`dns`. If a list, a list of objects in preference order.
- resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
- threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
- backend = config(None, 'GEVENT_BACKEND')
- threadpool_size = 10
- # using pprint.pformat can override custom __repr__ methods on dict/list
- # subclasses, which can be a security concern
- format_context = 'pprint.saferepr'
- def __init__(self, loop=None, default=None):
- RawGreenlet.__init__(self)
- if hasattr(loop, 'run'):
- if default is not None:
- raise TypeError("Unexpected argument: default")
- self.loop = loop
- elif _threadlocal.loop is not None:
- # Reuse a loop instance previously set by
- # destroying a hub without destroying the associated
- # loop. See #237 and #238.
- self.loop = _threadlocal.loop
- else:
- if default is None and get_ident() != MAIN_THREAD:
- default = False
- loop_class = _import(self.loop_class)
- if loop is None:
- loop = self.backend
- self.loop = loop_class(flags=loop, default=default)
- self._resolver = None
- self._threadpool = None
- self.format_context = _import(self.format_context)
- def __repr__(self):
- if self.loop is None:
- info = 'destroyed'
- else:
- try:
- info = self.loop._format()
- except Exception as ex: # pylint:disable=broad-except
- info = str(ex) or repr(ex) or 'error'
- result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
- if self._resolver is not None:
- result += ' resolver=%r' % self._resolver
- if self._threadpool is not None:
- result += ' threadpool=%r' % self._threadpool
- return result + '>'
- def handle_error(self, context, type, value, tb):
- """
- Called by the event loop when an error occurs. The arguments
- type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.
- Applications can set a property on the hub with this same signature
- to override the error handling provided by this class.
- Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
- to :meth:`handle_system_error`.
- :param context: If this is ``None``, indicates a system error that
- should generally result in exiting the loop and being thrown to the
- parent greenlet.
- """
- if isinstance(value, str):
- # Cython can raise errors where the value is a plain string
- # e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
- value = type(value)
- if not issubclass(type, self.NOT_ERROR):
- self.print_exception(context, type, value, tb)
- if context is None or issubclass(type, self.SYSTEM_ERROR):
- self.handle_system_error(type, value)
- def handle_system_error(self, type, value):
- current = getcurrent()
- if current is self or current is self.parent or self.loop is None:
- self.parent.throw(type, value)
- else:
- # in case system error was handled and life goes on
- # switch back to this greenlet as well
- cb = None
- try:
- cb = self.loop.run_callback(current.switch)
- except: # pylint:disable=bare-except
- traceback.print_exc(file=self.exception_stream)
- try:
- self.parent.throw(type, value)
- finally:
- if cb is not None:
- cb.stop()
- @readproperty
- def exception_stream(self):
- """
- The stream to which exceptions will be written.
- Defaults to ``sys.stderr`` unless assigned to.
- .. versionadded:: 1.2a1
- """
- # Unwrap any FileObjectThread we have thrown around sys.stderr
- # (because it can't be used in the hub). Tricky because we are
- # called in error situations when it's not safe to import.
- stderr = sys.stderr
- if type(stderr).__name__ == 'FileObjectThread':
- stderr = stderr.io # pylint:disable=no-member
- return stderr
- def print_exception(self, context, type, value, tb):
- # Python 3 does not gracefully handle None value or tb in
- # traceback.print_exception() as previous versions did.
- # pylint:disable=no-member
- errstream = self.exception_stream
- if value is None:
- errstream.write('%s\n' % type.__name__)
- else:
- traceback.print_exception(type, value, tb, file=errstream)
- del tb
- try:
- import time
- errstream.write(time.ctime())
- errstream.write(' ' if context is not None else '\n')
- except: # pylint:disable=bare-except
- # Possible not safe to import under certain
- # error conditions in Python 2
- pass
- if context is not None:
- if not isinstance(context, str):
- try:
- context = self.format_context(context)
- except: # pylint:disable=bare-except
- traceback.print_exc(file=self.exception_stream)
- context = repr(context)
- errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
- def switch(self):
- switch_out = getattr(getcurrent(), 'switch_out', None)
- if switch_out is not None:
- switch_out()
- return RawGreenlet.switch(self)
- def switch_out(self):
- raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
- def wait(self, watcher):
- """
- Wait until the *watcher* (which should not be started) is ready.
- The current greenlet will be unscheduled during this time.
- .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
- :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
- :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
- :class:`gevent.core.child`, :class:`gevent.core.stat`
- """
- waiter = Waiter()
- unique = object()
- watcher.start(waiter.switch, unique)
- try:
- result = waiter.get()
- if result is not unique:
- raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
- finally:
- watcher.stop()
- def cancel_wait(self, watcher, error):
- """
- Cancel an in-progress call to :meth:`wait` by throwing the given *error*
- in the waiting greenlet.
- """
- if watcher.callback is not None:
- self.loop.run_callback(self._cancel_wait, watcher, error)
- def _cancel_wait(self, watcher, error):
- if watcher.active:
- switch = watcher.callback
- if switch is not None:
- greenlet = getattr(switch, '__self__', None)
- if greenlet is not None:
- greenlet.throw(error)
- def run(self):
- """
- Entry-point to running the loop. This method is called automatically
- when the hub greenlet is scheduled; do not call it directly.
- :raises LoopExit: If the loop finishes running. This means
- that there are no other scheduled greenlets, and no active
- watchers or servers. In some situations, this indicates a
- programming error.
- """
- assert self is getcurrent(), 'Do not call Hub.run() directly'
- while True:
- loop = self.loop
- loop.error_handler = self
- try:
- loop.run()
- finally:
- loop.error_handler = None # break the refcount cycle
- self.parent.throw(LoopExit('This operation would block forever', self))
- # this function must never return, as it will cause switch() in the parent greenlet
- # to return an unexpected value
- # It is still possible to kill this greenlet with throw. However, in that case
- # switching to it is no longer safe, as switch will return immediatelly
- def join(self, timeout=None):
- """Wait for the event loop to finish. Exits only when there are
- no more spawned greenlets, started servers, active timeouts or watchers.
- If *timeout* is provided, wait no longer for the specified number of seconds.
- Returns True if exited because the loop finished execution.
- Returns False if exited because of timeout expired.
- """
- assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
- if self.dead:
- return True
- waiter = Waiter()
- if timeout is not None:
- timeout = self.loop.timer(timeout, ref=False)
- timeout.start(waiter.switch)
- try:
- try:
- waiter.get()
- except LoopExit:
- return True
- finally:
- if timeout is not None:
- timeout.stop()
- return False
- def destroy(self, destroy_loop=None):
- if self._resolver is not None:
- self._resolver.close()
- del self._resolver
- if self._threadpool is not None:
- self._threadpool.kill()
- del self._threadpool
- if destroy_loop is None:
- destroy_loop = not self.loop.default
- if destroy_loop:
- if _threadlocal.loop is self.loop:
- # Don't let anyone try to reuse this
- _threadlocal.loop = None
- self.loop.destroy()
- else:
- # Store in case another hub is created for this
- # thread.
- _threadlocal.loop = self.loop
- self.loop = None
- if _threadlocal.hub is self:
- _threadlocal.hub = None
- def _get_resolver(self):
- if self._resolver is None:
- if self.resolver_class is not None:
- self.resolver_class = _import(self.resolver_class)
- self._resolver = self.resolver_class(hub=self)
- return self._resolver
- def _set_resolver(self, value):
- self._resolver = value
- def _del_resolver(self):
- del self._resolver
- resolver = property(_get_resolver, _set_resolver, _del_resolver)
- def _get_threadpool(self):
- if self._threadpool is None:
- if self.threadpool_class is not None:
- self.threadpool_class = _import(self.threadpool_class)
- self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
- return self._threadpool
- def _set_threadpool(self, value):
- self._threadpool = value
- def _del_threadpool(self):
- del self._threadpool
- threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
- class Waiter(object):
- """
- A low level communication utility for greenlets.
- Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
- * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
- * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
- * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
- will store the value/exception. The following :meth:`get` will return the value/raise the exception.
- The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
- The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
- >>> result = Waiter()
- >>> timer = get_hub().loop.timer(0.1)
- >>> timer.start(result.switch, 'hello from Waiter')
- >>> result.get() # blocks for 0.1 seconds
- 'hello from Waiter'
- If switch is called before the greenlet gets a chance to call :meth:`get` then
- :class:`Waiter` stores the value.
- >>> result = Waiter()
- >>> timer = get_hub().loop.timer(0.1)
- >>> timer.start(result.switch, 'hi from Waiter')
- >>> sleep(0.2)
- >>> result.get() # returns immediatelly without blocking
- 'hi from Waiter'
- .. warning::
- This a limited and dangerous way to communicate between
- greenlets. It can easily leave a greenlet unscheduled forever
- if used incorrectly. Consider using safer classes such as
- :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
- or :class:`gevent.queue.Queue`.
- """
- __slots__ = ['hub', 'greenlet', 'value', '_exception']
- def __init__(self, hub=None):
- if hub is None:
- self.hub = get_hub()
- else:
- self.hub = hub
- self.greenlet = None
- self.value = None
- self._exception = _NONE
- def clear(self):
- self.greenlet = None
- self.value = None
- self._exception = _NONE
- def __str__(self):
- if self._exception is _NONE:
- return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
- if self._exception is None:
- return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
- return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
- def ready(self):
- """Return true if and only if it holds a value or an exception"""
- return self._exception is not _NONE
- def successful(self):
- """Return true if and only if it is ready and holds a value"""
- return self._exception is None
- @property
- def exc_info(self):
- "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
- if self._exception is not _NONE:
- return self._exception
- def switch(self, value=None):
- """Switch to the greenlet if one's available. Otherwise store the value."""
- greenlet = self.greenlet
- if greenlet is None:
- self.value = value
- self._exception = None
- else:
- assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
- switch = greenlet.switch
- try:
- switch(value)
- except: # pylint:disable=bare-except
- self.hub.handle_error(switch, *sys.exc_info())
- def switch_args(self, *args):
- return self.switch(args)
- def throw(self, *throw_args):
- """Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
- greenlet = self.greenlet
- if greenlet is None:
- self._exception = throw_args
- else:
- assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
- throw = greenlet.throw
- try:
- throw(*throw_args)
- except: # pylint:disable=bare-except
- self.hub.handle_error(throw, *sys.exc_info())
- def get(self):
- """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
- if self._exception is not _NONE:
- if self._exception is None:
- return self.value
- else:
- getcurrent().throw(*self._exception)
- else:
- if self.greenlet is not None:
- raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
- self.greenlet = getcurrent()
- try:
- return self.hub.switch()
- finally:
- self.greenlet = None
- def __call__(self, source):
- if source.exception is None:
- self.switch(source.value)
- else:
- self.throw(source.exception)
- # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
- # and unwraps it in wait() thus checking that switch() was indeed called
- class _MultipleWaiter(Waiter):
- """
- An internal extension of Waiter that can be used if multiple objects
- must be waited on, and there is a chance that in between waits greenlets
- might be switched out. All greenlets that switch to this waiter
- will have their value returned.
- This does not handle exceptions or throw methods.
- """
- __slots__ = ['_values']
- def __init__(self, *args, **kwargs):
- Waiter.__init__(self, *args, **kwargs)
- # we typically expect a relatively small number of these to be outstanding.
- # since we pop from the left, a deque might be slightly
- # more efficient, but since we're in the hub we avoid imports if
- # we can help it to better support monkey-patching, and delaying the import
- # here can be impractical (see https://github.com/gevent/gevent/issues/652)
- self._values = list()
- def switch(self, value): # pylint:disable=signature-differs
- self._values.append(value)
- Waiter.switch(self, True)
- def get(self):
- if not self._values:
- Waiter.get(self)
- Waiter.clear(self)
- return self._values.pop(0)
- def iwait(objects, timeout=None, count=None):
- """
- Iteratively yield *objects* as they are ready, until all (or *count*) are ready
- or *timeout* expired.
- :param objects: A sequence (supporting :func:`len`) containing objects
- implementing the wait protocol (rawlink() and unlink()).
- :keyword int count: If not `None`, then a number specifying the maximum number
- of objects to wait for. If ``None`` (the default), all objects
- are waited for.
- :keyword float timeout: If given, specifies a maximum number of seconds
- to wait. If the timeout expires before the desired waited-for objects
- are available, then this method returns immediately.
- .. seealso:: :func:`wait`
- .. versionchanged:: 1.1a1
- Add the *count* parameter.
- .. versionchanged:: 1.1a2
- No longer raise :exc:`LoopExit` if our caller switches greenlets
- in between items yielded by this function.
- """
- # QQQ would be nice to support iterable here that can be generated slowly (why?)
- if objects is None:
- yield get_hub().join(timeout=timeout)
- return
- count = len(objects) if count is None else min(count, len(objects))
- waiter = _MultipleWaiter()
- switch = waiter.switch
- if timeout is not None:
- timer = get_hub().loop.timer(timeout, priority=-1)
- timer.start(switch, _NONE)
- try:
- for obj in objects:
- obj.rawlink(switch)
- for _ in xrange(count):
- item = waiter.get()
- waiter.clear()
- if item is _NONE:
- return
- yield item
- finally:
- if timeout is not None:
- timer.stop()
- for aobj in objects:
- unlink = getattr(aobj, 'unlink', None)
- if unlink:
- try:
- unlink(switch)
- except: # pylint:disable=bare-except
- traceback.print_exc()
- def wait(objects=None, timeout=None, count=None):
- """
- Wait for ``objects`` to become ready or for event loop to finish.
- If ``objects`` is provided, it must be a list containing objects
- implementing the wait protocol (rawlink() and unlink() methods):
- - :class:`gevent.Greenlet` instance
- - :class:`gevent.event.Event` instance
- - :class:`gevent.lock.Semaphore` instance
- - :class:`gevent.subprocess.Popen` instance
- If ``objects`` is ``None`` (the default), ``wait()`` blocks until
- the current event loop has nothing to do (or until ``timeout`` passes):
- - all greenlets have finished
- - all servers were stopped
- - all event loop watchers were stopped.
- If ``count`` is ``None`` (the default), wait for all ``objects``
- to become ready.
- If ``count`` is a number, wait for (up to) ``count`` objects to become
- ready. (For example, if count is ``1`` then the function exits
- when any object in the list is ready).
- If ``timeout`` is provided, it specifies the maximum number of
- seconds ``wait()`` will block.
- Returns the list of ready objects, in the order in which they were
- ready.
- .. seealso:: :func:`iwait`
- """
- if objects is None:
- return get_hub().join(timeout=timeout)
- return list(iwait(objects, timeout, count))
- class linkproxy(object):
- __slots__ = ['callback', 'obj']
- def __init__(self, callback, obj):
- self.callback = callback
- self.obj = obj
- def __call__(self, *args):
- callback = self.callback
- obj = self.obj
- self.callback = None
- self.obj = None
- callback(obj)
|