future.py 1.8 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. """Future-returning APIs for tornado coroutines.
  2. .. seealso::
  3. :mod:`zmq.asyncio`
  4. """
  5. # Copyright (c) PyZMQ Developers.
  6. # Distributed under the terms of the Modified BSD License.
  7. import zmq as _zmq
  8. from zmq._future import _AsyncPoller, _AsyncSocket
  9. from tornado.concurrent import Future
  10. from tornado.ioloop import IOLoop
  11. class CancelledError(Exception):
  12. pass
  13. class _TornadoFuture(Future):
  14. """Subclass Tornado Future, reinstating cancellation."""
  15. def cancel(self):
  16. if self.done():
  17. return False
  18. self.set_exception(CancelledError())
  19. return True
  20. def cancelled(self):
  21. return self.done() and isinstance(self.exception(), CancelledError)
  22. # mixin for tornado/asyncio compatibility
  23. class _AsyncTornado(object):
  24. _Future = _TornadoFuture
  25. _READ = IOLoop.READ
  26. _WRITE = IOLoop.WRITE
  27. def _default_loop(self):
  28. return IOLoop.current()
  29. class Poller(_AsyncTornado, _AsyncPoller):
  30. def _watch_raw_socket(self, loop, socket, evt, f):
  31. """Schedule callback for a raw socket"""
  32. loop.add_handler(socket, lambda *args: f(), evt)
  33. def _unwatch_raw_sockets(self, loop, *sockets):
  34. """Unschedule callback for a raw socket"""
  35. for socket in sockets:
  36. loop.remove_handler(socket)
  37. class Socket(_AsyncTornado, _AsyncSocket):
  38. _poller_class = Poller
  39. Poller._socket_class = Socket
  40. class Context(_zmq.Context):
  41. # avoid sharing instance with base Context class
  42. _instance = None
  43. io_loop = None
  44. @staticmethod
  45. def _socket_class(self, socket_type):
  46. return Socket(self, socket_type, io_loop=self.io_loop)
  47. def __init__(self, *args, **kwargs):
  48. io_loop = kwargs.pop('io_loop', None)
  49. super(Context, self).__init__(*args, **kwargs)
  50. self.io_loop = io_loop or IOLoop.current()