event.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. # Copyright (c) 2009-2016 Denis Bilenko, gevent contributors. See LICENSE for details.
  2. """Basic synchronization primitives: Event and AsyncResult"""
  3. from __future__ import print_function
  4. import sys
  5. from gevent.hub import get_hub, getcurrent, _NONE
  6. from gevent._compat import reraise
  7. from gevent.hub import InvalidSwitchError
  8. from gevent.timeout import Timeout
  9. from gevent._tblib import dump_traceback, load_traceback
  10. __all__ = ['Event', 'AsyncResult']
  11. class _AbstractLinkable(object):
  12. # Encapsulates the standard parts of the linking and notifying protocol
  13. # common to both repeatable events and one-time events (AsyncResult).
  14. _notifier = None
  15. def __init__(self):
  16. # Also previously, AsyncResult maintained the order of notifications, but Event
  17. # did not; this implementation does not. (Event also only call callbacks one
  18. # time (set), but AsyncResult permitted duplicates.)
  19. # HOWEVER, gevent.queue.Queue does guarantee the order of getters relative
  20. # to putters. Some existing documentation out on the net likes to refer to
  21. # gevent as "deterministic", such that running the same program twice will
  22. # produce results in the same order (so long as I/O isn't involved). This could
  23. # be an argument to maintain order. (One easy way to do that while guaranteeing
  24. # uniqueness would be with a 2.7+ OrderedDict.)
  25. self._links = set()
  26. self.hub = get_hub()
  27. def ready(self):
  28. # Instances must define this
  29. raise NotImplementedError()
  30. def _check_and_notify(self):
  31. # If this object is ready to be notified, begin the process.
  32. if self.ready():
  33. if self._links and not self._notifier:
  34. self._notifier = self.hub.loop.run_callback(self._notify_links)
  35. def rawlink(self, callback):
  36. """
  37. Register a callback to call when this object is ready.
  38. *callback* will be called in the :class:`Hub <gevent.hub.Hub>`, so it must not use blocking gevent API.
  39. *callback* will be passed one argument: this instance.
  40. """
  41. if not callable(callback):
  42. raise TypeError('Expected callable: %r' % (callback, ))
  43. self._links.add(callback)
  44. self._check_and_notify()
  45. def unlink(self, callback):
  46. """Remove the callback set by :meth:`rawlink`"""
  47. try:
  48. self._links.remove(callback)
  49. except KeyError:
  50. pass
  51. def _notify_links(self):
  52. # Actually call the notification callbacks. Those callbacks in todo that are
  53. # still in _links are called. This method is careful to avoid iterating
  54. # over self._links, because links could be added or removed while this
  55. # method runs. Only links present when this method begins running
  56. # will be called; if a callback adds a new link, it will not run
  57. # until the next time notify_links is activated
  58. # We don't need to capture self._links as todo when establishing
  59. # this callback; any links removed between now and then are handled
  60. # by the `if` below; any links added are also grabbed
  61. todo = set(self._links)
  62. for link in todo:
  63. # check that link was not notified yet and was not removed by the client
  64. # We have to do this here, and not as part of the 'for' statement because
  65. # a previous link(self) call might have altered self._links
  66. if link in self._links:
  67. try:
  68. link(self)
  69. except: # pylint:disable=bare-except
  70. self.hub.handle_error((link, self), *sys.exc_info())
  71. if getattr(link, 'auto_unlink', None):
  72. # This attribute can avoid having to keep a reference to the function
  73. # *in* the function, which is a cycle
  74. self.unlink(link)
  75. # save a tiny bit of memory by letting _notifier be collected
  76. # bool(self._notifier) would turn to False as soon as we exit this
  77. # method anyway.
  78. del todo
  79. del self._notifier
  80. def _wait_core(self, timeout, catch=Timeout):
  81. # The core of the wait implementation, handling
  82. # switching and linking. If *catch* is set to (),
  83. # a timeout that elapses will be allowed to be raised.
  84. # Returns a true value if the wait succeeded without timing out.
  85. switch = getcurrent().switch
  86. self.rawlink(switch)
  87. try:
  88. timer = Timeout._start_new_or_dummy(timeout)
  89. try:
  90. try:
  91. result = self.hub.switch()
  92. if result is not self: # pragma: no cover
  93. raise InvalidSwitchError('Invalid switch into Event.wait(): %r' % (result, ))
  94. return True
  95. except catch as ex:
  96. if ex is not timer:
  97. raise
  98. # test_set_and_clear and test_timeout in test_threading
  99. # rely on the exact return values, not just truthish-ness
  100. return False
  101. finally:
  102. timer.cancel()
  103. finally:
  104. self.unlink(switch)
  105. def _wait_return_value(self, waited, wait_success):
  106. # pylint:disable=unused-argument
  107. return None
  108. def _wait(self, timeout=None):
  109. if self.ready():
  110. return self._wait_return_value(False, False)
  111. gotit = self._wait_core(timeout)
  112. return self._wait_return_value(True, gotit)
  113. class Event(_AbstractLinkable):
  114. """A synchronization primitive that allows one greenlet to wake up one or more others.
  115. It has the same interface as :class:`threading.Event` but works across greenlets.
  116. An event object manages an internal flag that can be set to true with the
  117. :meth:`set` method and reset to false with the :meth:`clear` method. The :meth:`wait` method
  118. blocks until the flag is true.
  119. .. note::
  120. The order and timing in which waiting greenlets are awakened is not determined.
  121. As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
  122. undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
  123. (those not waiting to be awakened) may run between the current greenlet yielding and
  124. the waiting greenlets being awakened. These details may change in the future.
  125. """
  126. _flag = False
  127. def __str__(self):
  128. return '<%s %s _links[%s]>' % (self.__class__.__name__, (self._flag and 'set') or 'clear', len(self._links))
  129. def is_set(self):
  130. """Return true if and only if the internal flag is true."""
  131. return self._flag
  132. isSet = is_set # makes it a better drop-in replacement for threading.Event
  133. ready = is_set # makes it compatible with AsyncResult and Greenlet (for example in wait())
  134. def set(self):
  135. """
  136. Set the internal flag to true.
  137. All greenlets waiting for it to become true are awakened in
  138. some order at some time in the future. Greenlets that call
  139. :meth:`wait` once the flag is true will not block at all
  140. (until :meth:`clear` is called).
  141. """
  142. self._flag = True
  143. self._check_and_notify()
  144. def clear(self):
  145. """
  146. Reset the internal flag to false.
  147. Subsequently, threads calling :meth:`wait` will block until
  148. :meth:`set` is called to set the internal flag to true again.
  149. """
  150. self._flag = False
  151. def _wait_return_value(self, waited, wait_success):
  152. # To avoid the race condition outlined in http://bugs.python.org/issue13502,
  153. # if we had to wait, then we need to return whether or not
  154. # the condition got changed. Otherwise we simply echo
  155. # the current state of the flag (which should be true)
  156. if not waited:
  157. flag = self._flag
  158. assert flag, "if we didn't wait we should already be set"
  159. return flag
  160. return wait_success
  161. def wait(self, timeout=None):
  162. """
  163. Block until the internal flag is true.
  164. If the internal flag is true on entry, return immediately. Otherwise,
  165. block until another thread (greenlet) calls :meth:`set` to set the flag to true,
  166. or until the optional timeout occurs.
  167. When the *timeout* argument is present and not ``None``, it should be a
  168. floating point number specifying a timeout for the operation in seconds
  169. (or fractions thereof).
  170. :return: This method returns true if and only if the internal flag has been set to
  171. true, either before the wait call or after the wait starts, so it will
  172. always return ``True`` except if a timeout is given and the operation
  173. times out.
  174. .. versionchanged:: 1.1
  175. The return value represents the flag during the elapsed wait, not
  176. just after it elapses. This solves a race condition if one greenlet
  177. sets and then clears the flag without switching, while other greenlets
  178. are waiting. When the waiters wake up, this will return True; previously,
  179. they would still wake up, but the return value would be False. This is most
  180. noticeable when the *timeout* is present.
  181. """
  182. return self._wait(timeout)
  183. def _reset_internal_locks(self): # pragma: no cover
  184. # for compatibility with threading.Event (only in case of patch_all(Event=True), by default Event is not patched)
  185. # Exception AttributeError: AttributeError("'Event' object has no attribute '_reset_internal_locks'",)
  186. # in <module 'threading' from '/usr/lib/python2.7/threading.pyc'> ignored
  187. pass
  188. class AsyncResult(_AbstractLinkable):
  189. """A one-time event that stores a value or an exception.
  190. Like :class:`Event` it wakes up all the waiters when :meth:`set` or :meth:`set_exception`
  191. is called. Waiters may receive the passed value or exception by calling :meth:`get`
  192. instead of :meth:`wait`. An :class:`AsyncResult` instance cannot be reset.
  193. To pass a value call :meth:`set`. Calls to :meth:`get` (those that are currently blocking as well as
  194. those made in the future) will return the value:
  195. >>> result = AsyncResult()
  196. >>> result.set(100)
  197. >>> result.get()
  198. 100
  199. To pass an exception call :meth:`set_exception`. This will cause :meth:`get` to raise that exception:
  200. >>> result = AsyncResult()
  201. >>> result.set_exception(RuntimeError('failure'))
  202. >>> result.get()
  203. Traceback (most recent call last):
  204. ...
  205. RuntimeError: failure
  206. :class:`AsyncResult` implements :meth:`__call__` and thus can be used as :meth:`link` target:
  207. >>> import gevent
  208. >>> result = AsyncResult()
  209. >>> gevent.spawn(lambda : 1/0).link(result)
  210. >>> try:
  211. ... result.get()
  212. ... except ZeroDivisionError:
  213. ... print('ZeroDivisionError')
  214. ZeroDivisionError
  215. .. note::
  216. The order and timing in which waiting greenlets are awakened is not determined.
  217. As an implementation note, in gevent 1.1 and 1.0, waiting greenlets are awakened in a
  218. undetermined order sometime *after* the current greenlet yields to the event loop. Other greenlets
  219. (those not waiting to be awakened) may run between the current greenlet yielding and
  220. the waiting greenlets being awakened. These details may change in the future.
  221. .. versionchanged:: 1.1
  222. The exact order in which waiting greenlets are awakened is not the same
  223. as in 1.0.
  224. .. versionchanged:: 1.1
  225. Callbacks :meth:`linked <rawlink>` to this object are required to be hashable, and duplicates are
  226. merged.
  227. """
  228. _value = _NONE
  229. _exc_info = ()
  230. _notifier = None
  231. @property
  232. def _exception(self):
  233. return self._exc_info[1] if self._exc_info else _NONE
  234. @property
  235. def value(self):
  236. """
  237. Holds the value passed to :meth:`set` if :meth:`set` was called. Otherwise,
  238. ``None``
  239. """
  240. return self._value if self._value is not _NONE else None
  241. @property
  242. def exc_info(self):
  243. """
  244. The three-tuple of exception information if :meth:`set_exception` was called.
  245. """
  246. if self._exc_info:
  247. return (self._exc_info[0], self._exc_info[1], load_traceback(self._exc_info[2]))
  248. return ()
  249. def __str__(self):
  250. result = '<%s ' % (self.__class__.__name__, )
  251. if self.value is not None or self._exception is not _NONE:
  252. result += 'value=%r ' % self.value
  253. if self._exception is not None and self._exception is not _NONE:
  254. result += 'exception=%r ' % self._exception
  255. if self._exception is _NONE:
  256. result += 'unset '
  257. return result + ' _links[%s]>' % len(self._links)
  258. def ready(self):
  259. """Return true if and only if it holds a value or an exception"""
  260. return self._exc_info or self._value is not _NONE
  261. def successful(self):
  262. """Return true if and only if it is ready and holds a value"""
  263. return self._value is not _NONE
  264. @property
  265. def exception(self):
  266. """Holds the exception instance passed to :meth:`set_exception` if :meth:`set_exception` was called.
  267. Otherwise ``None``."""
  268. if self._exc_info:
  269. return self._exc_info[1]
  270. def set(self, value=None):
  271. """Store the value and wake up any waiters.
  272. All greenlets blocking on :meth:`get` or :meth:`wait` are awakened.
  273. Subsequent calls to :meth:`wait` and :meth:`get` will not block at all.
  274. """
  275. self._value = value
  276. self._check_and_notify()
  277. def set_exception(self, exception, exc_info=None):
  278. """Store the exception and wake up any waiters.
  279. All greenlets blocking on :meth:`get` or :meth:`wait` are awakened.
  280. Subsequent calls to :meth:`wait` and :meth:`get` will not block at all.
  281. :keyword tuple exc_info: If given, a standard three-tuple of type, value, :class:`traceback`
  282. as returned by :func:`sys.exc_info`. This will be used when the exception
  283. is re-raised to propagate the correct traceback.
  284. """
  285. if exc_info:
  286. self._exc_info = (exc_info[0], exc_info[1], dump_traceback(exc_info[2]))
  287. else:
  288. self._exc_info = (type(exception), exception, dump_traceback(None))
  289. self._check_and_notify()
  290. def _raise_exception(self):
  291. reraise(*self.exc_info)
  292. def get(self, block=True, timeout=None):
  293. """Return the stored value or raise the exception.
  294. If this instance already holds a value or an exception, return or raise it immediatelly.
  295. Otherwise, block until another greenlet calls :meth:`set` or :meth:`set_exception` or
  296. until the optional timeout occurs.
  297. When the *timeout* argument is present and not ``None``, it should be a
  298. floating point number specifying a timeout for the operation in seconds
  299. (or fractions thereof). If the *timeout* elapses, the *Timeout* exception will
  300. be raised.
  301. :keyword bool block: If set to ``False`` and this instance is not ready,
  302. immediately raise a :class:`Timeout` exception.
  303. """
  304. if self._value is not _NONE:
  305. return self._value
  306. if self._exc_info:
  307. return self._raise_exception()
  308. if not block:
  309. # Not ready and not blocking, so immediately timeout
  310. raise Timeout()
  311. # Wait, raising a timeout that elapses
  312. self._wait_core(timeout, ())
  313. # by definition we are now ready
  314. return self.get(block=False)
  315. def get_nowait(self):
  316. """
  317. Return the value or raise the exception without blocking.
  318. If this object is not yet :meth:`ready <ready>`, raise
  319. :class:`gevent.Timeout` immediately.
  320. """
  321. return self.get(block=False)
  322. def _wait_return_value(self, waited, wait_success):
  323. # pylint:disable=unused-argument
  324. # Always return the value. Since this is a one-shot event,
  325. # no race condition should reset it.
  326. return self.value
  327. def wait(self, timeout=None):
  328. """Block until the instance is ready.
  329. If this instance already holds a value, it is returned immediately. If this
  330. instance already holds an exception, ``None`` is returned immediately.
  331. Otherwise, block until another greenlet calls :meth:`set` or :meth:`set_exception`
  332. (at which point either the value or ``None`` will be returned, respectively),
  333. or until the optional timeout expires (at which point ``None`` will also be
  334. returned).
  335. When the *timeout* argument is present and not ``None``, it should be a
  336. floating point number specifying a timeout for the operation in seconds
  337. (or fractions thereof).
  338. .. note:: If a timeout is given and expires, ``None`` will be returned
  339. (no timeout exception will be raised).
  340. """
  341. return self._wait(timeout)
  342. # link protocol
  343. def __call__(self, source):
  344. if source.successful():
  345. self.set(source.value)
  346. else:
  347. self.set_exception(source.exception, getattr(source, 'exc_info', None))
  348. # Methods to make us more like concurrent.futures.Future
  349. def result(self, timeout=None):
  350. return self.get(timeout=timeout)
  351. set_result = set
  352. def done(self):
  353. return self.ready()
  354. # we don't support cancelling
  355. def cancel(self):
  356. return False
  357. def cancelled(self):
  358. return False
  359. # exception is a method, we use it as a property