1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- """Abstract classes."""
- from __future__ import absolute_import, unicode_literals
- import abc
- from collections import Callable
- from .five import with_metaclass
- __all__ = ['Thenable']
- @with_metaclass(abc.ABCMeta)
- class Thenable(Callable): # pragma: no cover
- """Object that supports ``.then()``."""
- __slots__ = ()
- @abc.abstractmethod
- def then(self, on_success, on_error=None):
- raise NotImplementedError()
- @abc.abstractmethod
- def throw(self, exc=None, tb=None, propagate=True):
- raise NotImplementedError()
- @abc.abstractmethod
- def cancel(self):
- raise NotImplementedError()
- @classmethod
- def __subclasshook__(cls, C):
- if cls is Thenable:
- if any('then' in B.__dict__ for B in C.__mro__):
- return True
- return NotImplemented
- @classmethod
- def register(cls, other):
- # overide to return other so `register` can be used as a decorator
- type(cls).register(cls, other)
- return other
- @Thenable.register
- class ThenableProxy(object):
- """Proxy to object that supports ``.then()``."""
- def _set_promise_target(self, p):
- self._p = p
- def then(self, on_success, on_error=None):
- return self._p.then(on_success, on_error)
- def cancel(self):
- return self._p.cancel()
- def throw1(self, exc=None):
- return self._p.throw1(exc)
- def throw(self, exc=None, tb=None, propagate=True):
- return self._p.throw(exc, tb=tb, propagate=propagate)
- @property
- def cancelled(self):
- return self._p.cancelled
- @property
- def ready(self):
- return self._p.ready
- @property
- def failed(self):
- return self._p.failed
|