hub.py 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052
  1. # Copyright (c) 2009-2015 Denis Bilenko. See LICENSE for details.
  2. """
  3. Event-loop hub.
  4. """
  5. from __future__ import absolute_import
  6. # XXX: FIXME: Refactor to make this smaller
  7. # pylint:disable=too-many-lines
  8. from functools import partial as _functools_partial
  9. import os
  10. import sys
  11. import traceback
  12. from greenlet import greenlet as RawGreenlet, getcurrent, GreenletExit
  13. __all__ = [
  14. 'getcurrent',
  15. 'GreenletExit',
  16. 'spawn_raw',
  17. 'sleep',
  18. 'kill',
  19. 'signal',
  20. 'reinit',
  21. 'get_hub',
  22. 'Hub',
  23. 'Waiter',
  24. ]
  25. from gevent._compat import string_types
  26. from gevent._compat import xrange
  27. from gevent._util import _NONE
  28. from gevent._util import readproperty
  29. if sys.version_info[0] <= 2:
  30. import thread # pylint:disable=import-error
  31. else:
  32. import _thread as thread # python 2 pylint:disable=import-error
  33. # These must be the "real" native thread versions,
  34. # not monkey-patched.
  35. threadlocal = thread._local
  36. class _threadlocal(threadlocal):
  37. def __init__(self):
  38. # Use a class with an initializer so that we can test
  39. # for 'is None' instead of catching AttributeError, making
  40. # the code cleaner and possibly solving some corner cases
  41. # (like #687)
  42. threadlocal.__init__(self)
  43. self.Hub = None
  44. self.loop = None
  45. self.hub = None
  46. _threadlocal = _threadlocal()
  47. get_ident = thread.get_ident
  48. MAIN_THREAD = get_ident()
  49. class LoopExit(Exception):
  50. """
  51. Exception thrown when the hub finishes running.
  52. In a normal application, this is never thrown or caught
  53. explicitly. The internal implementation of functions like
  54. :func:`join` and :func:`joinall` may catch it, but user code
  55. generally should not.
  56. .. caution::
  57. Errors in application programming can also lead to this exception being
  58. raised. Some examples include (but are not limited too):
  59. - greenlets deadlocking on a lock;
  60. - using a socket or other gevent object with native thread
  61. affinity from a different thread
  62. """
  63. pass
  64. class BlockingSwitchOutError(AssertionError):
  65. pass
  66. class InvalidSwitchError(AssertionError):
  67. pass
  68. class ConcurrentObjectUseError(AssertionError):
  69. # raised when an object is used (waited on) by two greenlets
  70. # independently, meaning the object was entered into a blocking
  71. # state by one greenlet and then another while still blocking in the
  72. # first one
  73. pass
  74. def spawn_raw(function, *args, **kwargs):
  75. """
  76. Create a new :class:`greenlet.greenlet` object and schedule it to
  77. run ``function(*args, **kwargs)``.
  78. This returns a raw :class:`~greenlet.greenlet` which does not have all the useful
  79. methods that :class:`gevent.Greenlet` has. Typically, applications
  80. should prefer :func:`~gevent.spawn`, but this method may
  81. occasionally be useful as an optimization if there are many
  82. greenlets involved.
  83. .. versionchanged:: 1.1b1
  84. If *function* is not callable, immediately raise a :exc:`TypeError`
  85. instead of spawning a greenlet that will raise an uncaught TypeError.
  86. .. versionchanged:: 1.1rc2
  87. Accept keyword arguments for ``function`` as previously (incorrectly)
  88. documented. Note that this may incur an additional expense.
  89. .. versionchanged:: 1.1a3
  90. Verify that ``function`` is callable, raising a TypeError if not. Previously,
  91. the spawned greenlet would have failed the first time it was switched to.
  92. """
  93. if not callable(function):
  94. raise TypeError("function must be callable")
  95. hub = get_hub()
  96. # The callback class object that we use to run this doesn't
  97. # accept kwargs (and those objects are heavily used, as well as being
  98. # implemented twice in core.ppyx and corecffi.py) so do it with a partial
  99. if kwargs:
  100. function = _functools_partial(function, *args, **kwargs)
  101. g = RawGreenlet(function, hub)
  102. hub.loop.run_callback(g.switch)
  103. else:
  104. g = RawGreenlet(function, hub)
  105. hub.loop.run_callback(g.switch, *args)
  106. return g
  107. def sleep(seconds=0, ref=True):
  108. """
  109. Put the current greenlet to sleep for at least *seconds*.
  110. *seconds* may be specified as an integer, or a float if fractional
  111. seconds are desired.
  112. .. tip:: In the current implementation, a value of 0 (the default)
  113. means to yield execution to any other runnable greenlets, but
  114. this greenlet may be scheduled again before the event loop
  115. cycles (in an extreme case, a greenlet that repeatedly sleeps
  116. with 0 can prevent greenlets that are ready to do I/O from
  117. being scheduled for some (small) period of time); a value greater than
  118. 0, on the other hand, will delay running this greenlet until
  119. the next iteration of the loop.
  120. If *ref* is False, the greenlet running ``sleep()`` will not prevent :func:`gevent.wait`
  121. from exiting.
  122. .. seealso:: :func:`idle`
  123. """
  124. hub = get_hub()
  125. loop = hub.loop
  126. if seconds <= 0:
  127. waiter = Waiter()
  128. loop.run_callback(waiter.switch)
  129. waiter.get()
  130. else:
  131. hub.wait(loop.timer(seconds, ref=ref))
  132. def idle(priority=0):
  133. """
  134. Cause the calling greenlet to wait until the event loop is idle.
  135. Idle is defined as having no other events of the same or higher
  136. *priority* pending. That is, as long as sockets, timeouts or even
  137. signals of the same or higher priority are being processed, the loop
  138. is not idle.
  139. .. seealso:: :func:`sleep`
  140. """
  141. hub = get_hub()
  142. watcher = hub.loop.idle()
  143. if priority:
  144. watcher.priority = priority
  145. hub.wait(watcher)
  146. def kill(greenlet, exception=GreenletExit):
  147. """
  148. Kill greenlet asynchronously. The current greenlet is not unscheduled.
  149. .. note::
  150. The method :meth:`Greenlet.kill` method does the same and
  151. more (and the same caveats listed there apply here). However, the MAIN
  152. greenlet - the one that exists initially - does not have a
  153. ``kill()`` method, and neither do any created with :func:`spawn_raw`,
  154. so you have to use this function.
  155. .. caution:: Use care when killing greenlets. If they are not prepared for
  156. exceptions, this could result in corrupted state.
  157. .. versionchanged:: 1.1a2
  158. If the ``greenlet`` has a :meth:`kill <Greenlet.kill>` method, calls it. This prevents a
  159. greenlet from being switched to for the first time after it's been
  160. killed but not yet executed.
  161. """
  162. if not greenlet.dead:
  163. if hasattr(greenlet, 'kill'):
  164. # dealing with gevent.greenlet.Greenlet. Use it, especially
  165. # to avoid allowing one to be switched to for the first time
  166. # after it's been killed
  167. greenlet.kill(exception=exception, block=False)
  168. else:
  169. get_hub().loop.run_callback(greenlet.throw, exception)
  170. class signal(object):
  171. """
  172. Call the *handler* with the *args* and *kwargs* when the process
  173. receives the signal *signalnum*.
  174. The *handler* will be run in a new greenlet when the signal is delivered.
  175. This returns an object with the useful method ``cancel``, which, when called,
  176. will prevent future deliveries of *signalnum* from calling *handler*.
  177. .. note::
  178. This may not operate correctly with SIGCHLD if libev child watchers
  179. are used (as they are by default with os.fork).
  180. .. versionchanged:: 1.2a1
  181. The ``handler`` argument is required to be callable at construction time.
  182. """
  183. # XXX: This is manually documented in gevent.rst while it is aliased in
  184. # the gevent module.
  185. greenlet_class = None
  186. def __init__(self, signalnum, handler, *args, **kwargs):
  187. if not callable(handler):
  188. raise TypeError("signal handler must be callable.")
  189. self.hub = get_hub()
  190. self.watcher = self.hub.loop.signal(signalnum, ref=False)
  191. self.watcher.start(self._start)
  192. self.handler = handler
  193. self.args = args
  194. self.kwargs = kwargs
  195. if self.greenlet_class is None:
  196. from gevent import Greenlet
  197. self.greenlet_class = Greenlet
  198. def _get_ref(self):
  199. return self.watcher.ref
  200. def _set_ref(self, value):
  201. self.watcher.ref = value
  202. ref = property(_get_ref, _set_ref)
  203. del _get_ref, _set_ref
  204. def cancel(self):
  205. self.watcher.stop()
  206. def _start(self):
  207. try:
  208. greenlet = self.greenlet_class(self.handle)
  209. greenlet.switch()
  210. except: # pylint:disable=bare-except
  211. self.hub.handle_error(None, *sys._exc_info()) # pylint:disable=no-member
  212. def handle(self):
  213. try:
  214. self.handler(*self.args, **self.kwargs)
  215. except: # pylint:disable=bare-except
  216. self.hub.handle_error(None, *sys.exc_info())
  217. def reinit():
  218. """
  219. Prepare the gevent hub to run in a new (forked) process.
  220. This should be called *immediately* after :func:`os.fork` in the
  221. child process. This is done automatically by
  222. :func:`gevent.os.fork` or if the :mod:`os` module has been
  223. monkey-patched. If this function is not called in a forked
  224. process, symptoms may include hanging of functions like
  225. :func:`socket.getaddrinfo`, and the hub's threadpool is unlikely
  226. to work.
  227. .. note:: Registered fork watchers may or may not run before
  228. this function (and thus ``gevent.os.fork``) return. If they have
  229. not run, they will run "soon", after an iteration of the event loop.
  230. You can force this by inserting a few small (but non-zero) calls to :func:`sleep`
  231. after fork returns. (As of gevent 1.1 and before, fork watchers will
  232. not have run, but this may change in the future.)
  233. .. note:: This function may be removed in a future major release
  234. if the fork process can be more smoothly managed.
  235. .. warning:: See remarks in :func:`gevent.os.fork` about greenlets
  236. and libev watchers in the child process.
  237. """
  238. # The loop reinit function in turn calls libev's ev_loop_fork
  239. # function.
  240. hub = _get_hub()
  241. if hub is not None:
  242. # Note that we reinit the existing loop, not destroy it.
  243. # See https://github.com/gevent/gevent/issues/200.
  244. hub.loop.reinit()
  245. # libev's fork watchers are slow to fire because the only fire
  246. # at the beginning of a loop; due to our use of callbacks that
  247. # run at the end of the loop, that may be too late. The
  248. # threadpool and resolvers depend on the fork handlers being
  249. # run (specifically, the threadpool will fail in the forked
  250. # child if there were any threads in it, which there will be
  251. # if the resolver_thread was in use (the default) before the
  252. # fork.)
  253. #
  254. # If the forked process wants to use the threadpool or
  255. # resolver immediately (in a queued callback), it would hang.
  256. #
  257. # The below is a workaround. Fortunately, both of these
  258. # methods are idempotent and can be called multiple times
  259. # following a fork if the suddenly started working, or were
  260. # already working on some platforms. Other threadpools and fork handlers
  261. # will be called at an arbitrary time later ('soon')
  262. if hasattr(hub.threadpool, '_on_fork'):
  263. hub.threadpool._on_fork()
  264. # resolver_ares also has a fork watcher that's not firing
  265. if hasattr(hub.resolver, '_on_fork'):
  266. hub.resolver._on_fork()
  267. # TODO: We'd like to sleep for a non-zero amount of time to force the loop to make a
  268. # pass around before returning to this greenlet. That will allow any
  269. # user-provided fork watchers to run. (Two calls are necessary.) HOWEVER, if
  270. # we do this, certain tests that heavily mix threads and forking,
  271. # like 2.7/test_threading:test_reinit_tls_after_fork, fail. It's not immediately clear
  272. # why.
  273. #sleep(0.00001)
  274. #sleep(0.00001)
  275. def get_hub_class():
  276. """Return the type of hub to use for the current thread.
  277. If there's no type of hub for the current thread yet, 'gevent.hub.Hub' is used.
  278. """
  279. hubtype = _threadlocal.Hub
  280. if hubtype is None:
  281. hubtype = _threadlocal.Hub = Hub
  282. return hubtype
  283. def get_hub(*args, **kwargs):
  284. """
  285. Return the hub for the current thread.
  286. If a hub does not exist in the current thread, a new one is
  287. created of the type returned by :func:`get_hub_class`.
  288. """
  289. hub = _threadlocal.hub
  290. if hub is None:
  291. hubtype = get_hub_class()
  292. hub = _threadlocal.hub = hubtype(*args, **kwargs)
  293. return hub
  294. def _get_hub():
  295. """Return the hub for the current thread.
  296. Return ``None`` if no hub has been created yet.
  297. """
  298. return _threadlocal.hub
  299. def set_hub(hub):
  300. _threadlocal.hub = hub
  301. def _import(path):
  302. # pylint:disable=too-many-branches
  303. if isinstance(path, list):
  304. if not path:
  305. raise ImportError('Cannot import from empty list: %r' % (path, ))
  306. for item in path[:-1]:
  307. try:
  308. return _import(item)
  309. except ImportError:
  310. pass
  311. return _import(path[-1])
  312. if not isinstance(path, string_types):
  313. return path
  314. if '.' not in path:
  315. raise ImportError("Cannot import %r (required format: [path/][package.]module.class)" % path)
  316. if '/' in path:
  317. package_path, path = path.rsplit('/', 1)
  318. sys.path = [package_path] + sys.path
  319. else:
  320. package_path = None
  321. try:
  322. module, item = path.rsplit('.', 1)
  323. x = __import__(module)
  324. for attr in path.split('.')[1:]:
  325. oldx = x
  326. x = getattr(x, attr, _NONE)
  327. if x is _NONE:
  328. raise ImportError('Cannot import %r from %r' % (attr, oldx))
  329. return x
  330. finally:
  331. try:
  332. sys.path.remove(package_path)
  333. except ValueError:
  334. pass
  335. def config(default, envvar):
  336. result = os.environ.get(envvar) or default # absolute import gets confused pylint: disable=no-member
  337. if isinstance(result, string_types):
  338. return result.split(',')
  339. return result
  340. def resolver_config(default, envvar):
  341. result = config(default, envvar)
  342. return [_resolvers.get(x, x) for x in result]
  343. _resolvers = {'ares': 'gevent.resolver_ares.Resolver',
  344. 'thread': 'gevent.resolver_thread.Resolver',
  345. 'block': 'gevent.socket.BlockingResolver'}
  346. _DEFAULT_LOOP_CLASS = 'gevent.core.loop'
  347. class Hub(RawGreenlet):
  348. """A greenlet that runs the event loop.
  349. It is created automatically by :func:`get_hub`.
  350. **Switching**
  351. Every time this greenlet (i.e., the event loop) is switched *to*, if
  352. the current greenlet has a ``switch_out`` method, it will be called. This
  353. allows a greenlet to take some cleanup actions before yielding control. This method
  354. should not call any gevent blocking functions.
  355. """
  356. #: If instances of these classes are raised into the event loop,
  357. #: they will be propagated out to the main greenlet (where they will
  358. #: usually be caught by Python itself)
  359. SYSTEM_ERROR = (KeyboardInterrupt, SystemExit, SystemError)
  360. #: Instances of these classes are not considered to be errors and
  361. #: do not get logged/printed when raised by the event loop.
  362. NOT_ERROR = (GreenletExit, SystemExit)
  363. loop_class = config(_DEFAULT_LOOP_CLASS, 'GEVENT_LOOP')
  364. # For the standard class, go ahead and import it when this class
  365. # is defined. This is no loss of generality because the envvar is
  366. # only read when this class is defined, and we know that the
  367. # standard class will be available. This can solve problems with
  368. # the class being imported from multiple threads at once, leading
  369. # to one of the imports failing. Only do this for the object we
  370. # need in the constructor, as the rest of the factories are
  371. # themselves handled lazily. See #687. (People using a custom loop_class
  372. # can probably manage to get_hub() from the main thread or otherwise import
  373. # that loop_class themselves.)
  374. if loop_class == [_DEFAULT_LOOP_CLASS]:
  375. loop_class = [_import(loop_class)]
  376. resolver_class = ['gevent.resolver_thread.Resolver',
  377. 'gevent.resolver_ares.Resolver',
  378. 'gevent.socket.BlockingResolver']
  379. #: The class or callable object, or the name of a factory function or class,
  380. #: that will be used to create :attr:`resolver`. By default, configured according to
  381. #: :doc:`dns`. If a list, a list of objects in preference order.
  382. resolver_class = resolver_config(resolver_class, 'GEVENT_RESOLVER')
  383. threadpool_class = config('gevent.threadpool.ThreadPool', 'GEVENT_THREADPOOL')
  384. backend = config(None, 'GEVENT_BACKEND')
  385. threadpool_size = 10
  386. # using pprint.pformat can override custom __repr__ methods on dict/list
  387. # subclasses, which can be a security concern
  388. format_context = 'pprint.saferepr'
  389. def __init__(self, loop=None, default=None):
  390. RawGreenlet.__init__(self)
  391. if hasattr(loop, 'run'):
  392. if default is not None:
  393. raise TypeError("Unexpected argument: default")
  394. self.loop = loop
  395. elif _threadlocal.loop is not None:
  396. # Reuse a loop instance previously set by
  397. # destroying a hub without destroying the associated
  398. # loop. See #237 and #238.
  399. self.loop = _threadlocal.loop
  400. else:
  401. if default is None and get_ident() != MAIN_THREAD:
  402. default = False
  403. loop_class = _import(self.loop_class)
  404. if loop is None:
  405. loop = self.backend
  406. self.loop = loop_class(flags=loop, default=default)
  407. self._resolver = None
  408. self._threadpool = None
  409. self.format_context = _import(self.format_context)
  410. def __repr__(self):
  411. if self.loop is None:
  412. info = 'destroyed'
  413. else:
  414. try:
  415. info = self.loop._format()
  416. except Exception as ex: # pylint:disable=broad-except
  417. info = str(ex) or repr(ex) or 'error'
  418. result = '<%s at 0x%x %s' % (self.__class__.__name__, id(self), info)
  419. if self._resolver is not None:
  420. result += ' resolver=%r' % self._resolver
  421. if self._threadpool is not None:
  422. result += ' threadpool=%r' % self._threadpool
  423. return result + '>'
  424. def handle_error(self, context, type, value, tb):
  425. """
  426. Called by the event loop when an error occurs. The arguments
  427. type, value, and tb are the standard tuple returned by :func:`sys.exc_info`.
  428. Applications can set a property on the hub with this same signature
  429. to override the error handling provided by this class.
  430. Errors that are :attr:`system errors <SYSTEM_ERROR>` are passed
  431. to :meth:`handle_system_error`.
  432. :param context: If this is ``None``, indicates a system error that
  433. should generally result in exiting the loop and being thrown to the
  434. parent greenlet.
  435. """
  436. if isinstance(value, str):
  437. # Cython can raise errors where the value is a plain string
  438. # e.g., AttributeError, "_semaphore.Semaphore has no attr", <traceback>
  439. value = type(value)
  440. if not issubclass(type, self.NOT_ERROR):
  441. self.print_exception(context, type, value, tb)
  442. if context is None or issubclass(type, self.SYSTEM_ERROR):
  443. self.handle_system_error(type, value)
  444. def handle_system_error(self, type, value):
  445. current = getcurrent()
  446. if current is self or current is self.parent or self.loop is None:
  447. self.parent.throw(type, value)
  448. else:
  449. # in case system error was handled and life goes on
  450. # switch back to this greenlet as well
  451. cb = None
  452. try:
  453. cb = self.loop.run_callback(current.switch)
  454. except: # pylint:disable=bare-except
  455. traceback.print_exc(file=self.exception_stream)
  456. try:
  457. self.parent.throw(type, value)
  458. finally:
  459. if cb is not None:
  460. cb.stop()
  461. @readproperty
  462. def exception_stream(self):
  463. """
  464. The stream to which exceptions will be written.
  465. Defaults to ``sys.stderr`` unless assigned to.
  466. .. versionadded:: 1.2a1
  467. """
  468. # Unwrap any FileObjectThread we have thrown around sys.stderr
  469. # (because it can't be used in the hub). Tricky because we are
  470. # called in error situations when it's not safe to import.
  471. stderr = sys.stderr
  472. if type(stderr).__name__ == 'FileObjectThread':
  473. stderr = stderr.io # pylint:disable=no-member
  474. return stderr
  475. def print_exception(self, context, type, value, tb):
  476. # Python 3 does not gracefully handle None value or tb in
  477. # traceback.print_exception() as previous versions did.
  478. # pylint:disable=no-member
  479. errstream = self.exception_stream
  480. if value is None:
  481. errstream.write('%s\n' % type.__name__)
  482. else:
  483. traceback.print_exception(type, value, tb, file=errstream)
  484. del tb
  485. try:
  486. import time
  487. errstream.write(time.ctime())
  488. errstream.write(' ' if context is not None else '\n')
  489. except: # pylint:disable=bare-except
  490. # Possible not safe to import under certain
  491. # error conditions in Python 2
  492. pass
  493. if context is not None:
  494. if not isinstance(context, str):
  495. try:
  496. context = self.format_context(context)
  497. except: # pylint:disable=bare-except
  498. traceback.print_exc(file=self.exception_stream)
  499. context = repr(context)
  500. errstream.write('%s failed with %s\n\n' % (context, getattr(type, '__name__', 'exception'), ))
  501. def switch(self):
  502. switch_out = getattr(getcurrent(), 'switch_out', None)
  503. if switch_out is not None:
  504. switch_out()
  505. return RawGreenlet.switch(self)
  506. def switch_out(self):
  507. raise BlockingSwitchOutError('Impossible to call blocking function in the event loop callback')
  508. def wait(self, watcher):
  509. """
  510. Wait until the *watcher* (which should not be started) is ready.
  511. The current greenlet will be unscheduled during this time.
  512. .. seealso:: :class:`gevent.core.io`, :class:`gevent.core.timer`,
  513. :class:`gevent.core.signal`, :class:`gevent.core.idle`, :class:`gevent.core.prepare`,
  514. :class:`gevent.core.check`, :class:`gevent.core.fork`, :class:`gevent.core.async`,
  515. :class:`gevent.core.child`, :class:`gevent.core.stat`
  516. """
  517. waiter = Waiter()
  518. unique = object()
  519. watcher.start(waiter.switch, unique)
  520. try:
  521. result = waiter.get()
  522. if result is not unique:
  523. raise InvalidSwitchError('Invalid switch into %s: %r (expected %r)' % (getcurrent(), result, unique))
  524. finally:
  525. watcher.stop()
  526. def cancel_wait(self, watcher, error):
  527. """
  528. Cancel an in-progress call to :meth:`wait` by throwing the given *error*
  529. in the waiting greenlet.
  530. """
  531. if watcher.callback is not None:
  532. self.loop.run_callback(self._cancel_wait, watcher, error)
  533. def _cancel_wait(self, watcher, error):
  534. if watcher.active:
  535. switch = watcher.callback
  536. if switch is not None:
  537. greenlet = getattr(switch, '__self__', None)
  538. if greenlet is not None:
  539. greenlet.throw(error)
  540. def run(self):
  541. """
  542. Entry-point to running the loop. This method is called automatically
  543. when the hub greenlet is scheduled; do not call it directly.
  544. :raises LoopExit: If the loop finishes running. This means
  545. that there are no other scheduled greenlets, and no active
  546. watchers or servers. In some situations, this indicates a
  547. programming error.
  548. """
  549. assert self is getcurrent(), 'Do not call Hub.run() directly'
  550. while True:
  551. loop = self.loop
  552. loop.error_handler = self
  553. try:
  554. loop.run()
  555. finally:
  556. loop.error_handler = None # break the refcount cycle
  557. self.parent.throw(LoopExit('This operation would block forever', self))
  558. # this function must never return, as it will cause switch() in the parent greenlet
  559. # to return an unexpected value
  560. # It is still possible to kill this greenlet with throw. However, in that case
  561. # switching to it is no longer safe, as switch will return immediatelly
  562. def join(self, timeout=None):
  563. """Wait for the event loop to finish. Exits only when there are
  564. no more spawned greenlets, started servers, active timeouts or watchers.
  565. If *timeout* is provided, wait no longer for the specified number of seconds.
  566. Returns True if exited because the loop finished execution.
  567. Returns False if exited because of timeout expired.
  568. """
  569. assert getcurrent() is self.parent, "only possible from the MAIN greenlet"
  570. if self.dead:
  571. return True
  572. waiter = Waiter()
  573. if timeout is not None:
  574. timeout = self.loop.timer(timeout, ref=False)
  575. timeout.start(waiter.switch)
  576. try:
  577. try:
  578. waiter.get()
  579. except LoopExit:
  580. return True
  581. finally:
  582. if timeout is not None:
  583. timeout.stop()
  584. return False
  585. def destroy(self, destroy_loop=None):
  586. if self._resolver is not None:
  587. self._resolver.close()
  588. del self._resolver
  589. if self._threadpool is not None:
  590. self._threadpool.kill()
  591. del self._threadpool
  592. if destroy_loop is None:
  593. destroy_loop = not self.loop.default
  594. if destroy_loop:
  595. if _threadlocal.loop is self.loop:
  596. # Don't let anyone try to reuse this
  597. _threadlocal.loop = None
  598. self.loop.destroy()
  599. else:
  600. # Store in case another hub is created for this
  601. # thread.
  602. _threadlocal.loop = self.loop
  603. self.loop = None
  604. if _threadlocal.hub is self:
  605. _threadlocal.hub = None
  606. def _get_resolver(self):
  607. if self._resolver is None:
  608. if self.resolver_class is not None:
  609. self.resolver_class = _import(self.resolver_class)
  610. self._resolver = self.resolver_class(hub=self)
  611. return self._resolver
  612. def _set_resolver(self, value):
  613. self._resolver = value
  614. def _del_resolver(self):
  615. del self._resolver
  616. resolver = property(_get_resolver, _set_resolver, _del_resolver)
  617. def _get_threadpool(self):
  618. if self._threadpool is None:
  619. if self.threadpool_class is not None:
  620. self.threadpool_class = _import(self.threadpool_class)
  621. self._threadpool = self.threadpool_class(self.threadpool_size, hub=self)
  622. return self._threadpool
  623. def _set_threadpool(self, value):
  624. self._threadpool = value
  625. def _del_threadpool(self):
  626. del self._threadpool
  627. threadpool = property(_get_threadpool, _set_threadpool, _del_threadpool)
  628. class Waiter(object):
  629. """
  630. A low level communication utility for greenlets.
  631. Waiter is a wrapper around greenlet's ``switch()`` and ``throw()`` calls that makes them somewhat safer:
  632. * switching will occur only if the waiting greenlet is executing :meth:`get` method currently;
  633. * any error raised in the greenlet is handled inside :meth:`switch` and :meth:`throw`
  634. * if :meth:`switch`/:meth:`throw` is called before the receiver calls :meth:`get`, then :class:`Waiter`
  635. will store the value/exception. The following :meth:`get` will return the value/raise the exception.
  636. The :meth:`switch` and :meth:`throw` methods must only be called from the :class:`Hub` greenlet.
  637. The :meth:`get` method must be called from a greenlet other than :class:`Hub`.
  638. >>> result = Waiter()
  639. >>> timer = get_hub().loop.timer(0.1)
  640. >>> timer.start(result.switch, 'hello from Waiter')
  641. >>> result.get() # blocks for 0.1 seconds
  642. 'hello from Waiter'
  643. If switch is called before the greenlet gets a chance to call :meth:`get` then
  644. :class:`Waiter` stores the value.
  645. >>> result = Waiter()
  646. >>> timer = get_hub().loop.timer(0.1)
  647. >>> timer.start(result.switch, 'hi from Waiter')
  648. >>> sleep(0.2)
  649. >>> result.get() # returns immediatelly without blocking
  650. 'hi from Waiter'
  651. .. warning::
  652. This a limited and dangerous way to communicate between
  653. greenlets. It can easily leave a greenlet unscheduled forever
  654. if used incorrectly. Consider using safer classes such as
  655. :class:`gevent.event.Event`, :class:`gevent.event.AsyncResult`,
  656. or :class:`gevent.queue.Queue`.
  657. """
  658. __slots__ = ['hub', 'greenlet', 'value', '_exception']
  659. def __init__(self, hub=None):
  660. if hub is None:
  661. self.hub = get_hub()
  662. else:
  663. self.hub = hub
  664. self.greenlet = None
  665. self.value = None
  666. self._exception = _NONE
  667. def clear(self):
  668. self.greenlet = None
  669. self.value = None
  670. self._exception = _NONE
  671. def __str__(self):
  672. if self._exception is _NONE:
  673. return '<%s greenlet=%s>' % (type(self).__name__, self.greenlet)
  674. if self._exception is None:
  675. return '<%s greenlet=%s value=%r>' % (type(self).__name__, self.greenlet, self.value)
  676. return '<%s greenlet=%s exc_info=%r>' % (type(self).__name__, self.greenlet, self.exc_info)
  677. def ready(self):
  678. """Return true if and only if it holds a value or an exception"""
  679. return self._exception is not _NONE
  680. def successful(self):
  681. """Return true if and only if it is ready and holds a value"""
  682. return self._exception is None
  683. @property
  684. def exc_info(self):
  685. "Holds the exception info passed to :meth:`throw` if :meth:`throw` was called. Otherwise ``None``."
  686. if self._exception is not _NONE:
  687. return self._exception
  688. def switch(self, value=None):
  689. """Switch to the greenlet if one's available. Otherwise store the value."""
  690. greenlet = self.greenlet
  691. if greenlet is None:
  692. self.value = value
  693. self._exception = None
  694. else:
  695. assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
  696. switch = greenlet.switch
  697. try:
  698. switch(value)
  699. except: # pylint:disable=bare-except
  700. self.hub.handle_error(switch, *sys.exc_info())
  701. def switch_args(self, *args):
  702. return self.switch(args)
  703. def throw(self, *throw_args):
  704. """Switch to the greenlet with the exception. If there's no greenlet, store the exception."""
  705. greenlet = self.greenlet
  706. if greenlet is None:
  707. self._exception = throw_args
  708. else:
  709. assert getcurrent() is self.hub, "Can only use Waiter.switch method from the Hub greenlet"
  710. throw = greenlet.throw
  711. try:
  712. throw(*throw_args)
  713. except: # pylint:disable=bare-except
  714. self.hub.handle_error(throw, *sys.exc_info())
  715. def get(self):
  716. """If a value/an exception is stored, return/raise it. Otherwise until switch() or throw() is called."""
  717. if self._exception is not _NONE:
  718. if self._exception is None:
  719. return self.value
  720. else:
  721. getcurrent().throw(*self._exception)
  722. else:
  723. if self.greenlet is not None:
  724. raise ConcurrentObjectUseError('This Waiter is already used by %r' % (self.greenlet, ))
  725. self.greenlet = getcurrent()
  726. try:
  727. return self.hub.switch()
  728. finally:
  729. self.greenlet = None
  730. def __call__(self, source):
  731. if source.exception is None:
  732. self.switch(source.value)
  733. else:
  734. self.throw(source.exception)
  735. # can also have a debugging version, that wraps the value in a tuple (self, value) in switch()
  736. # and unwraps it in wait() thus checking that switch() was indeed called
  737. class _MultipleWaiter(Waiter):
  738. """
  739. An internal extension of Waiter that can be used if multiple objects
  740. must be waited on, and there is a chance that in between waits greenlets
  741. might be switched out. All greenlets that switch to this waiter
  742. will have their value returned.
  743. This does not handle exceptions or throw methods.
  744. """
  745. __slots__ = ['_values']
  746. def __init__(self, *args, **kwargs):
  747. Waiter.__init__(self, *args, **kwargs)
  748. # we typically expect a relatively small number of these to be outstanding.
  749. # since we pop from the left, a deque might be slightly
  750. # more efficient, but since we're in the hub we avoid imports if
  751. # we can help it to better support monkey-patching, and delaying the import
  752. # here can be impractical (see https://github.com/gevent/gevent/issues/652)
  753. self._values = list()
  754. def switch(self, value): # pylint:disable=signature-differs
  755. self._values.append(value)
  756. Waiter.switch(self, True)
  757. def get(self):
  758. if not self._values:
  759. Waiter.get(self)
  760. Waiter.clear(self)
  761. return self._values.pop(0)
  762. def iwait(objects, timeout=None, count=None):
  763. """
  764. Iteratively yield *objects* as they are ready, until all (or *count*) are ready
  765. or *timeout* expired.
  766. :param objects: A sequence (supporting :func:`len`) containing objects
  767. implementing the wait protocol (rawlink() and unlink()).
  768. :keyword int count: If not `None`, then a number specifying the maximum number
  769. of objects to wait for. If ``None`` (the default), all objects
  770. are waited for.
  771. :keyword float timeout: If given, specifies a maximum number of seconds
  772. to wait. If the timeout expires before the desired waited-for objects
  773. are available, then this method returns immediately.
  774. .. seealso:: :func:`wait`
  775. .. versionchanged:: 1.1a1
  776. Add the *count* parameter.
  777. .. versionchanged:: 1.1a2
  778. No longer raise :exc:`LoopExit` if our caller switches greenlets
  779. in between items yielded by this function.
  780. """
  781. # QQQ would be nice to support iterable here that can be generated slowly (why?)
  782. if objects is None:
  783. yield get_hub().join(timeout=timeout)
  784. return
  785. count = len(objects) if count is None else min(count, len(objects))
  786. waiter = _MultipleWaiter()
  787. switch = waiter.switch
  788. if timeout is not None:
  789. timer = get_hub().loop.timer(timeout, priority=-1)
  790. timer.start(switch, _NONE)
  791. try:
  792. for obj in objects:
  793. obj.rawlink(switch)
  794. for _ in xrange(count):
  795. item = waiter.get()
  796. waiter.clear()
  797. if item is _NONE:
  798. return
  799. yield item
  800. finally:
  801. if timeout is not None:
  802. timer.stop()
  803. for aobj in objects:
  804. unlink = getattr(aobj, 'unlink', None)
  805. if unlink:
  806. try:
  807. unlink(switch)
  808. except: # pylint:disable=bare-except
  809. traceback.print_exc()
  810. def wait(objects=None, timeout=None, count=None):
  811. """
  812. Wait for ``objects`` to become ready or for event loop to finish.
  813. If ``objects`` is provided, it must be a list containing objects
  814. implementing the wait protocol (rawlink() and unlink() methods):
  815. - :class:`gevent.Greenlet` instance
  816. - :class:`gevent.event.Event` instance
  817. - :class:`gevent.lock.Semaphore` instance
  818. - :class:`gevent.subprocess.Popen` instance
  819. If ``objects`` is ``None`` (the default), ``wait()`` blocks until
  820. the current event loop has nothing to do (or until ``timeout`` passes):
  821. - all greenlets have finished
  822. - all servers were stopped
  823. - all event loop watchers were stopped.
  824. If ``count`` is ``None`` (the default), wait for all ``objects``
  825. to become ready.
  826. If ``count`` is a number, wait for (up to) ``count`` objects to become
  827. ready. (For example, if count is ``1`` then the function exits
  828. when any object in the list is ready).
  829. If ``timeout`` is provided, it specifies the maximum number of
  830. seconds ``wait()`` will block.
  831. Returns the list of ready objects, in the order in which they were
  832. ready.
  833. .. seealso:: :func:`iwait`
  834. """
  835. if objects is None:
  836. return get_hub().join(timeout=timeout)
  837. return list(iwait(objects, timeout, count))
  838. class linkproxy(object):
  839. __slots__ = ['callback', 'obj']
  840. def __init__(self, callback, obj):
  841. self.callback = callback
  842. self.obj = obj
  843. def __call__(self, *args):
  844. callback = self.callback
  845. obj = self.obj
  846. self.callback = None
  847. self.obj = None
  848. callback(obj)