123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- """Synchronization primitives."""
- from __future__ import absolute_import, unicode_literals
- from .abstract import Thenable
- from .promises import promise
- __all__ = ['barrier']
- class barrier(object):
- """Barrier.
- Synchronization primitive to call a callback after a list
- of promises have been fulfilled.
- Example:
- .. code-block:: python
- # Request supports the .then() method.
- p1 = http.Request('http://a')
- p2 = http.Request('http://b')
- p3 = http.Request('http://c')
- requests = [p1, p2, p3]
- def all_done():
- pass # all requests complete
- b = barrier(requests).then(all_done)
- # oops, we forgot we want another request
- b.add(http.Request('http://d'))
- Note that you cannot add new promises to a barrier after
- the barrier is fulfilled.
- """
- def __init__(self, promises=None, args=None, kwargs=None,
- callback=None, size=None):
- self.p = promise()
- self.args = args or ()
- self.kwargs = kwargs or {}
- self._value = 0
- self.size = size or 0
- if not self.size and promises:
- # iter(l) calls len(l) so generator wrappers
- # can only return NotImplemented in the case the
- # generator is not fully consumed yet.
- plen = promises.__len__()
- if plen is not NotImplemented:
- self.size = plen
- self.ready = self.failed = False
- self.reason = None
- self.cancelled = False
- self.finalized = False
- [self.add_noincr(p) for p in promises or []]
- self.finalized = bool(promises or self.size)
- if callback:
- self.then(callback)
- def __call__(self, *args, **kwargs):
- if not self.ready and not self.cancelled:
- self._value += 1
- if self.finalized and self._value >= self.size:
- self.ready = True
- self.p(*self.args, **self.kwargs)
- def finalize(self):
- if not self.finalized and self._value >= self.size:
- self.p(*self.args, **self.kwargs)
- self.finalized = True
- def cancel(self):
- self.cancelled = True
- self.p.cancel()
- def add_noincr(self, p):
- if not self.cancelled:
- if self.ready:
- raise ValueError('Cannot add promise to full barrier')
- p.then(self)
- def add(self, p):
- if not self.cancelled:
- self.add_noincr(p)
- self.size += 1
- def then(self, callback, errback=None):
- self.p.then(callback, errback)
- def throw(self, *args, **kwargs):
- if not self.cancelled:
- self.p.throw(*args, **kwargs)
- throw1 = throw
- Thenable.register(barrier) # noqa: E305
|