future.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from __future__ import absolute_import
  2. import functools
  3. import logging
  4. log = logging.getLogger(__name__)
  5. class Future(object):
  6. error_on_callbacks = False # and errbacks
  7. def __init__(self):
  8. self.is_done = False
  9. self.value = None
  10. self.exception = None
  11. self._callbacks = []
  12. self._errbacks = []
  13. def succeeded(self):
  14. return self.is_done and not bool(self.exception)
  15. def failed(self):
  16. return self.is_done and bool(self.exception)
  17. def retriable(self):
  18. try:
  19. return self.exception.retriable
  20. except AttributeError:
  21. return False
  22. def success(self, value):
  23. assert not self.is_done, 'Future is already complete'
  24. self.value = value
  25. self.is_done = True
  26. if self._callbacks:
  27. self._call_backs('callback', self._callbacks, self.value)
  28. return self
  29. def failure(self, e):
  30. assert not self.is_done, 'Future is already complete'
  31. self.exception = e if type(e) is not type else e()
  32. assert isinstance(self.exception, BaseException), (
  33. 'future failed without an exception')
  34. self.is_done = True
  35. self._call_backs('errback', self._errbacks, self.exception)
  36. return self
  37. def add_callback(self, f, *args, **kwargs):
  38. if args or kwargs:
  39. f = functools.partial(f, *args, **kwargs)
  40. if self.is_done and not self.exception:
  41. self._call_backs('callback', [f], self.value)
  42. else:
  43. self._callbacks.append(f)
  44. return self
  45. def add_errback(self, f, *args, **kwargs):
  46. if args or kwargs:
  47. f = functools.partial(f, *args, **kwargs)
  48. if self.is_done and self.exception:
  49. self._call_backs('errback', [f], self.exception)
  50. else:
  51. self._errbacks.append(f)
  52. return self
  53. def add_both(self, f, *args, **kwargs):
  54. self.add_callback(f, *args, **kwargs)
  55. self.add_errback(f, *args, **kwargs)
  56. return self
  57. def chain(self, future):
  58. self.add_callback(future.success)
  59. self.add_errback(future.failure)
  60. return self
  61. def _call_backs(self, back_type, backs, value):
  62. for f in backs:
  63. try:
  64. f(value)
  65. except Exception as e:
  66. log.exception('Error processing %s', back_type)
  67. if self.error_on_callbacks:
  68. raise e