promises.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223
  1. """Promise implementation."""
  2. from __future__ import absolute_import, unicode_literals
  3. import sys
  4. from collections import deque
  5. from weakref import ref
  6. from .abstract import Thenable
  7. from .five import python_2_unicode_compatible, reraise
  8. __all__ = ['promise']
  9. @Thenable.register
  10. @python_2_unicode_compatible
  11. class promise(object):
  12. """Promise of future evaluation.
  13. This is a special implementation of promises in that it can
  14. be used both for "promise of a value" and lazy evaluation.
  15. The biggest upside for this is that everything in a promise can also be
  16. a promise, e.g. filters, callbacks and errbacks can all be promises.
  17. Usage examples:
  18. .. code-block:: python
  19. >>> from __future__ import print_statement # noqa
  20. >>> p = promise()
  21. >>> p.then(promise(print, ('OK',))) # noqa
  22. >>> p.on_error = promise(print, ('ERROR',)) # noqa
  23. >>> p(20)
  24. OK, 20
  25. >>> p.then(promise(print, ('hello',))) # noqa
  26. hello, 20
  27. >>> p.throw(KeyError('foo'))
  28. ERROR, KeyError('foo')
  29. >>> p2 = promise()
  30. >>> p2.then(print) # noqa
  31. >>> p2.cancel()
  32. >>> p(30)
  33. Example:
  34. .. code-block:: python
  35. from vine import promise, wrap
  36. class Protocol(object):
  37. def __init__(self):
  38. self.buffer = []
  39. def receive_message(self):
  40. return self.read_header().then(
  41. self.read_body).then(
  42. wrap(self.prepare_body))
  43. def read(self, size, callback=None):
  44. callback = callback or promise()
  45. tell_eventloop_to_read(size, callback)
  46. return callback
  47. def read_header(self, callback=None):
  48. return self.read(4, callback)
  49. def read_body(self, header, callback=None):
  50. body_size, = unpack('>L', header)
  51. return self.read(body_size, callback)
  52. def prepare_body(self, value):
  53. self.buffer.append(value)
  54. """
  55. if not hasattr(sys, 'pypy_version_info'): # pragma: no cover
  56. __slots__ = (
  57. 'fun', 'args', 'kwargs', 'ready', 'failed',
  58. 'value', 'reason', '_svpending', '_lvpending',
  59. 'on_error', 'cancelled', 'weak', '__weakref__',
  60. )
  61. def __init__(self, fun=None, args=None, kwargs=None,
  62. callback=None, on_error=None, weak=False):
  63. self.weak = weak
  64. self.fun = ref(fun) if self.weak else fun
  65. self.args = args or ()
  66. self.kwargs = kwargs or {}
  67. self.ready = False
  68. self.failed = False
  69. self.value = None
  70. self.reason = None
  71. # Optimization
  72. # Most promises will only have one callback, so we optimize for this
  73. # case by using a list only when there are multiple callbacks.
  74. # s(calar) pending / l(ist) pending
  75. self._svpending = None
  76. self._lvpending = None
  77. self.on_error = on_error
  78. self.cancelled = False
  79. if callback is not None:
  80. self.then(callback)
  81. if self.fun:
  82. assert self.fun and callable(fun)
  83. def __repr__(self):
  84. return ('<{0} --> {1!r}>' if self.fun else '<{0}>').format(
  85. '{0}@0x{1:x}'.format(type(self).__name__, id(self)), self.fun,
  86. )
  87. def cancel(self):
  88. self.cancelled = True
  89. try:
  90. if self._svpending is not None:
  91. self._svpending.cancel()
  92. if self._lvpending is not None:
  93. for pending in self._lvpending:
  94. pending.cancel()
  95. if isinstance(self.on_error, Thenable):
  96. self.on_error.cancel()
  97. finally:
  98. self._svpending = self._lvpending = self.on_error = None
  99. def __call__(self, *args, **kwargs):
  100. retval = None
  101. if self.cancelled:
  102. return
  103. final_args = self.args + args if args else self.args
  104. final_kwargs = dict(self.kwargs, **kwargs) if kwargs else self.kwargs
  105. # self.fun may be a weakref
  106. fun = self._fun_is_alive(self.fun)
  107. if fun is not None:
  108. try:
  109. retval = fun(*final_args, **final_kwargs)
  110. self.value = (ca, ck) = (retval,), {}
  111. except Exception:
  112. return self.throw()
  113. else:
  114. self.value = (ca, ck) = final_args, final_kwargs
  115. self.ready = True
  116. svpending = self._svpending
  117. if svpending is not None:
  118. try:
  119. svpending(*ca, **ck)
  120. finally:
  121. self._svpending = None
  122. else:
  123. lvpending = self._lvpending
  124. try:
  125. while lvpending:
  126. p = lvpending.popleft()
  127. p(*ca, **ck)
  128. finally:
  129. self._lvpending = None
  130. return retval
  131. def _fun_is_alive(self, fun):
  132. return fun() if self.weak else self.fun
  133. def then(self, callback, on_error=None):
  134. if not isinstance(callback, Thenable):
  135. callback = promise(callback, on_error=on_error)
  136. if self.cancelled:
  137. callback.cancel()
  138. return callback
  139. if self.failed:
  140. callback.throw(self.reason)
  141. elif self.ready:
  142. args, kwargs = self.value
  143. callback(*args, **kwargs)
  144. if self._lvpending is None:
  145. svpending = self._svpending
  146. if svpending is not None:
  147. self._svpending, self._lvpending = None, deque([svpending])
  148. else:
  149. self._svpending = callback
  150. return callback
  151. self._lvpending.append(callback)
  152. return callback
  153. def throw1(self, exc=None):
  154. if not self.cancelled:
  155. exc = exc if exc is not None else sys.exc_info()[1]
  156. self.failed, self.reason = True, exc
  157. if self.on_error:
  158. self.on_error(*self.args + (exc,), **self.kwargs)
  159. def throw(self, exc=None, tb=None, propagate=True):
  160. if not self.cancelled:
  161. current_exc = sys.exc_info()[1]
  162. exc = exc if exc is not None else current_exc
  163. try:
  164. self.throw1(exc)
  165. svpending = self._svpending
  166. if svpending is not None:
  167. try:
  168. svpending.throw1(exc)
  169. finally:
  170. self._svpending = None
  171. else:
  172. lvpending = self._lvpending
  173. try:
  174. while lvpending:
  175. lvpending.popleft().throw1(exc)
  176. finally:
  177. self._lvpending = None
  178. finally:
  179. if self.on_error is None and propagate:
  180. if tb is None and (exc is None or exc is current_exc):
  181. raise
  182. reraise(type(exc), exc, tb)
  183. @property
  184. def listeners(self):
  185. if self._lvpending:
  186. return self._lvpending
  187. return [self._svpending]