futures.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. """
  2. Support for Futures (asynchronously executed callables).
  3. If you're using Python 3.2 or newer, also see
  4. http://docs.python.org/3/library/concurrent.futures.html#future-objects
  5. Pyro - Python Remote Objects. Copyright by Irmen de Jong (irmen@razorvine.net).
  6. """
  7. import sys
  8. import time
  9. import functools
  10. import logging
  11. import Pyro4.util
  12. from Pyro4 import threadutil
  13. __all__ = ["Future", "FutureResult", "_ExceptionWrapper"]
  14. log = logging.getLogger("Pyro4.futures")
  15. class Future(object):
  16. """
  17. Holds a callable that will be executed asynchronously and provide its
  18. result value some time in the future.
  19. This is a more general implementation than the AsyncRemoteMethod, which
  20. only works with Pyro proxies (and provides a bit different syntax).
  21. This class has a few extra features as well (delay, canceling).
  22. """
  23. def __init__(self, somecallable):
  24. self.callable = somecallable
  25. self.chain = []
  26. self.exceptionhandler = None
  27. self.call_delay = 0
  28. self.cancelled = False
  29. self.completed = False
  30. def __call__(self, *args, **kwargs):
  31. """
  32. Start the future call with the provided arguments.
  33. Control flow returns immediately, with a FutureResult object.
  34. """
  35. if self.completed or not hasattr(self, "chain"):
  36. raise RuntimeError("the future has already been evaluated")
  37. if self.cancelled:
  38. raise RuntimeError("the future has been cancelled")
  39. chain = self.chain
  40. del self.chain # make it impossible to add new calls to the chain once we started executing it
  41. result = FutureResult() # notice that the call chain doesn't sit on the result object
  42. thread = threadutil.Thread(target=self.__asynccall, args=(result, chain, args, kwargs))
  43. thread.setDaemon(True)
  44. thread.start()
  45. return result
  46. def __asynccall(self, asyncresult, chain, args, kwargs):
  47. while self.call_delay > 0 and not self.cancelled:
  48. delay = min(self.call_delay, 2)
  49. time.sleep(delay)
  50. self.call_delay -= delay
  51. if self.cancelled:
  52. self.completed = True
  53. asyncresult.set_cancelled()
  54. return
  55. try:
  56. self.completed = True
  57. self.cancelled = False
  58. value = self.callable(*args, **kwargs)
  59. # now walk the callchain, passing on the previous value as first argument
  60. for call, args, kwargs in chain:
  61. call = functools.partial(call, value)
  62. value = call(*args, **kwargs)
  63. asyncresult.value = value
  64. except Exception as x:
  65. if self.exceptionhandler:
  66. self.exceptionhandler(x)
  67. asyncresult.value = _ExceptionWrapper(sys.exc_info()[1])
  68. def delay(self, seconds):
  69. """
  70. Delay the evaluation of the future for the given number of seconds.
  71. Return True if succesful otherwise False if the future has already been evaluated.
  72. """
  73. if self.completed:
  74. return False
  75. self.call_delay = seconds
  76. return True
  77. def cancel(self):
  78. """
  79. Cancels the execution of the future altogether.
  80. If the execution hasn't been started yet, the cancellation is succesful and returns True.
  81. Otherwise, it failed and returns False.
  82. """
  83. if self.completed:
  84. return False
  85. self.cancelled = True
  86. return True
  87. def then(self, call, *args, **kwargs):
  88. """
  89. Add a callable to the call chain, to be invoked when the results become available.
  90. The result of the current call will be used as the first argument for the next call.
  91. Optional extra arguments can be provided in args and kwargs.
  92. Returns self so you can easily chain then() calls.
  93. """
  94. self.chain.append((call, args, kwargs))
  95. return self
  96. def iferror(self, exceptionhandler):
  97. """
  98. Specify the exception handler to be invoked (with the exception object as only
  99. argument) when calculating the result raises an exception.
  100. If no exception handler is set, any exception raised in the async call will be silently ignored.
  101. Returns self so you can easily chain other calls.
  102. """
  103. self.exceptionhandler = exceptionhandler
  104. return self
  105. class FutureResult(object):
  106. """
  107. The result object for asynchronous Pyro calls.
  108. Unfortunatley it should be similar to the more general Future class but
  109. it is still somewhat limited (no delay, no canceling).
  110. """
  111. def __init__(self):
  112. self.__ready = threadutil.Event()
  113. self.callchain = []
  114. self.valueLock = threadutil.Lock()
  115. self.exceptionhandler = None
  116. def wait(self, timeout=None):
  117. """
  118. Wait for the result to become available, with optional timeout (in seconds).
  119. Returns True if the result is ready, or False if it still isn't ready.
  120. """
  121. result = self.__ready.wait(timeout)
  122. if result is None:
  123. # older pythons return None from wait()
  124. return self.__ready.isSet()
  125. return result
  126. @property
  127. def ready(self):
  128. """Boolean that contains the readiness of the async result"""
  129. return self.__ready.isSet()
  130. def get_value(self):
  131. self.__ready.wait()
  132. if isinstance(self.__value, _ExceptionWrapper):
  133. self.__value.raiseIt()
  134. else:
  135. return self.__value
  136. def set_value(self, value):
  137. with self.valueLock:
  138. self.__value = value
  139. # walk the call chain if the result is not an exception, otherwise invoke the errorhandler (if any)
  140. if isinstance(value, _ExceptionWrapper):
  141. if self.exceptionhandler:
  142. self.exceptionhandler(value.exception)
  143. else:
  144. for call, args, kwargs in self.callchain:
  145. call = functools.partial(call, self.__value)
  146. self.__value = call(*args, **kwargs)
  147. if isinstance(self.__value, _ExceptionWrapper):
  148. break
  149. self.callchain = []
  150. self.__ready.set()
  151. value = property(get_value, set_value, None, "The result value of the call. Reading it will block if not available yet.")
  152. def set_cancelled(self):
  153. self.set_value(_ExceptionWrapper(RuntimeError("future has been cancelled")))
  154. def then(self, call, *args, **kwargs):
  155. """
  156. Add a callable to the call chain, to be invoked when the results become available.
  157. The result of the current call will be used as the first argument for the next call.
  158. Optional extra arguments can be provided in args and kwargs.
  159. Returns self so you can easily chain then() calls.
  160. """
  161. with self.valueLock:
  162. if self.__ready.isSet():
  163. # value is already known, we need to process it immediately (can't use the call chain anymore)
  164. call = functools.partial(call, self.__value)
  165. self.__value = call(*args, **kwargs)
  166. else:
  167. # add the call to the call chain, it will be processed later when the result arrives
  168. self.callchain.append((call, args, kwargs))
  169. return self
  170. def iferror(self, exceptionhandler):
  171. """
  172. Specify the exception handler to be invoked (with the exception object as only
  173. argument) when asking for the result raises an exception.
  174. If no exception handler is set, any exception result will be silently ignored (unless
  175. you explicitly ask for the value). Returns self so you can easily chain other calls.
  176. """
  177. self.exceptionhandler = exceptionhandler
  178. return self
  179. class _ExceptionWrapper(object):
  180. """Class that wraps a remote exception. If this is returned, Pyro will
  181. re-throw the exception on the receiving side. Usually this is taken care of
  182. by a special response message flag, but in the case of batched calls this
  183. flag is useless and another mechanism was needed."""
  184. def __init__(self, exception):
  185. self.exception = exception
  186. def raiseIt(self):
  187. if sys.platform == "cli":
  188. Pyro4.util.fixIronPythonExceptionForPickle(self.exception, False)
  189. raise self.exception
  190. def __serialized_dict__(self):
  191. """serialized form as a dictionary"""
  192. return {
  193. "__class__": "Pyro4.futures._ExceptionWrapper",
  194. "exception": Pyro4.util.SerializerBase.class_to_dict(self.exception)
  195. }