synchronization.py 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. """Synchronization primitives."""
  2. from __future__ import absolute_import, unicode_literals
  3. from .abstract import Thenable
  4. from .promises import promise
  5. __all__ = ['barrier']
  6. class barrier(object):
  7. """Barrier.
  8. Synchronization primitive to call a callback after a list
  9. of promises have been fulfilled.
  10. Example:
  11. .. code-block:: python
  12. # Request supports the .then() method.
  13. p1 = http.Request('http://a')
  14. p2 = http.Request('http://b')
  15. p3 = http.Request('http://c')
  16. requests = [p1, p2, p3]
  17. def all_done():
  18. pass # all requests complete
  19. b = barrier(requests).then(all_done)
  20. # oops, we forgot we want another request
  21. b.add(http.Request('http://d'))
  22. Note that you cannot add new promises to a barrier after
  23. the barrier is fulfilled.
  24. """
  25. def __init__(self, promises=None, args=None, kwargs=None,
  26. callback=None, size=None):
  27. self.p = promise()
  28. self.args = args or ()
  29. self.kwargs = kwargs or {}
  30. self._value = 0
  31. self.size = size or 0
  32. if not self.size and promises:
  33. # iter(l) calls len(l) so generator wrappers
  34. # can only return NotImplemented in the case the
  35. # generator is not fully consumed yet.
  36. plen = promises.__len__()
  37. if plen is not NotImplemented:
  38. self.size = plen
  39. self.ready = self.failed = False
  40. self.reason = None
  41. self.cancelled = False
  42. self.finalized = False
  43. [self.add_noincr(p) for p in promises or []]
  44. self.finalized = bool(promises or self.size)
  45. if callback:
  46. self.then(callback)
  47. def __call__(self, *args, **kwargs):
  48. if not self.ready and not self.cancelled:
  49. self._value += 1
  50. if self.finalized and self._value >= self.size:
  51. self.ready = True
  52. self.p(*self.args, **self.kwargs)
  53. def finalize(self):
  54. if not self.finalized and self._value >= self.size:
  55. self.p(*self.args, **self.kwargs)
  56. self.finalized = True
  57. def cancel(self):
  58. self.cancelled = True
  59. self.p.cancel()
  60. def add_noincr(self, p):
  61. if not self.cancelled:
  62. if self.ready:
  63. raise ValueError('Cannot add promise to full barrier')
  64. p.then(self)
  65. def add(self, p):
  66. if not self.cancelled:
  67. self.add_noincr(p)
  68. self.size += 1
  69. def then(self, callback, errback=None):
  70. self.p.then(callback, errback)
  71. def throw(self, *args, **kwargs):
  72. if not self.cancelled:
  73. self.p.throw(*args, **kwargs)
  74. throw1 = throw
  75. Thenable.register(barrier) # noqa: E305