_deprecated.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # coding: utf-8
  2. """tornado IOLoop API with zmq compatibility
  3. If you have tornado ≥ 3.0, this is a subclass of tornado's IOLoop,
  4. otherwise we ship a minimal subset of tornado in zmq.eventloop.minitornado.
  5. The minimal shipped version of tornado's IOLoop does not include
  6. support for concurrent futures - this will only be available if you
  7. have tornado ≥ 3.0.
  8. """
  9. # Copyright (C) PyZMQ Developers
  10. # Distributed under the terms of the Modified BSD License.
  11. from __future__ import absolute_import, division, with_statement
  12. import os
  13. import time
  14. import warnings
  15. from zmq import (
  16. Poller,
  17. POLLIN, POLLOUT, POLLERR,
  18. ZMQError, ETERM,
  19. )
  20. try:
  21. import tornado
  22. tornado_version = tornado.version_info
  23. except (ImportError, AttributeError):
  24. tornado_version = ()
  25. from .minitornado.ioloop import PollIOLoop, PeriodicCallback
  26. from .minitornado.log import gen_log
  27. class DelayedCallback(PeriodicCallback):
  28. """Schedules the given callback to be called once.
  29. The callback is called once, after callback_time milliseconds.
  30. `start` must be called after the DelayedCallback is created.
  31. The timeout is calculated from when `start` is called.
  32. """
  33. def __init__(self, callback, callback_time, io_loop=None):
  34. # PeriodicCallback require callback_time to be positive
  35. warnings.warn("""DelayedCallback is deprecated.
  36. Use loop.add_timeout instead.""", DeprecationWarning)
  37. callback_time = max(callback_time, 1e-3)
  38. super(DelayedCallback, self).__init__(callback, callback_time, io_loop)
  39. def start(self):
  40. """Starts the timer."""
  41. self._running = True
  42. self._firstrun = True
  43. self._next_timeout = time.time() + self.callback_time / 1000.0
  44. self.io_loop.add_timeout(self._next_timeout, self._run)
  45. def _run(self):
  46. if not self._running: return
  47. self._running = False
  48. try:
  49. self.callback()
  50. except Exception:
  51. gen_log.error("Error in delayed callback", exc_info=True)
  52. class ZMQPoller(object):
  53. """A poller that can be used in the tornado IOLoop.
  54. This simply wraps a regular zmq.Poller, scaling the timeout
  55. by 1000, so that it is in seconds rather than milliseconds.
  56. """
  57. def __init__(self):
  58. self._poller = Poller()
  59. @staticmethod
  60. def _map_events(events):
  61. """translate IOLoop.READ/WRITE/ERROR event masks into zmq.POLLIN/OUT/ERR"""
  62. z_events = 0
  63. if events & IOLoop.READ:
  64. z_events |= POLLIN
  65. if events & IOLoop.WRITE:
  66. z_events |= POLLOUT
  67. if events & IOLoop.ERROR:
  68. z_events |= POLLERR
  69. return z_events
  70. @staticmethod
  71. def _remap_events(z_events):
  72. """translate zmq.POLLIN/OUT/ERR event masks into IOLoop.READ/WRITE/ERROR"""
  73. events = 0
  74. if z_events & POLLIN:
  75. events |= IOLoop.READ
  76. if z_events & POLLOUT:
  77. events |= IOLoop.WRITE
  78. if z_events & POLLERR:
  79. events |= IOLoop.ERROR
  80. return events
  81. def register(self, fd, events):
  82. return self._poller.register(fd, self._map_events(events))
  83. def modify(self, fd, events):
  84. return self._poller.modify(fd, self._map_events(events))
  85. def unregister(self, fd):
  86. return self._poller.unregister(fd)
  87. def poll(self, timeout):
  88. """poll in seconds rather than milliseconds.
  89. Event masks will be IOLoop.READ/WRITE/ERROR
  90. """
  91. z_events = self._poller.poll(1000*timeout)
  92. return [ (fd,self._remap_events(evt)) for (fd,evt) in z_events ]
  93. def close(self):
  94. pass
  95. class ZMQIOLoop(PollIOLoop):
  96. """ZMQ subclass of tornado's IOLoop
  97. Minor modifications, so that .current/.instance return self
  98. """
  99. _zmq_impl = ZMQPoller
  100. def initialize(self, impl=None, **kwargs):
  101. impl = self._zmq_impl() if impl is None else impl
  102. super(ZMQIOLoop, self).initialize(impl=impl, **kwargs)
  103. @classmethod
  104. def instance(cls, *args, **kwargs):
  105. """Returns a global `IOLoop` instance.
  106. Most applications have a single, global `IOLoop` running on the
  107. main thread. Use this method to get this instance from
  108. another thread. To get the current thread's `IOLoop`, use `current()`.
  109. """
  110. # install ZMQIOLoop as the active IOLoop implementation
  111. # when using tornado 3
  112. if tornado_version >= (3,):
  113. PollIOLoop.configure(cls)
  114. loop = PollIOLoop.instance(*args, **kwargs)
  115. if not isinstance(loop, cls):
  116. warnings.warn("IOLoop.current expected instance of %r, got %r" % (cls, loop),
  117. RuntimeWarning, stacklevel=2,
  118. )
  119. return loop
  120. @classmethod
  121. def current(cls, *args, **kwargs):
  122. """Returns the current thread’s IOLoop.
  123. """
  124. # install ZMQIOLoop as the active IOLoop implementation
  125. # when using tornado 3
  126. if tornado_version >= (3,):
  127. PollIOLoop.configure(cls)
  128. loop = PollIOLoop.current(*args, **kwargs)
  129. if not isinstance(loop, cls):
  130. warnings.warn("IOLoop.current expected instance of %r, got %r" % (cls, loop),
  131. RuntimeWarning, stacklevel=2,
  132. )
  133. return loop
  134. def start(self):
  135. try:
  136. super(ZMQIOLoop, self).start()
  137. except ZMQError as e:
  138. if e.errno == ETERM:
  139. # quietly return on ETERM
  140. pass
  141. else:
  142. raise
  143. if (3, 0) <= tornado_version < (3, 1):
  144. def backport_close(self, all_fds=False):
  145. """backport IOLoop.close to 3.0 from 3.1 (supports fd.close() method)"""
  146. from zmq.eventloop.minitornado.ioloop import PollIOLoop as mini_loop
  147. return mini_loop.close.__get__(self)(all_fds)
  148. ZMQIOLoop.close = backport_close
  149. # public API name
  150. IOLoop = ZMQIOLoop
  151. def install():
  152. """set the tornado IOLoop instance with the pyzmq IOLoop.
  153. After calling this function, tornado's IOLoop.instance() and pyzmq's
  154. IOLoop.instance() will return the same object.
  155. An assertion error will be raised if tornado's IOLoop has been initialized
  156. prior to calling this function.
  157. """
  158. from tornado import ioloop
  159. # check if tornado's IOLoop is already initialized to something other
  160. # than the pyzmq IOLoop instance:
  161. assert (not ioloop.IOLoop.initialized()) or \
  162. ioloop.IOLoop.instance() is IOLoop.instance(), "tornado IOLoop already initialized"
  163. if tornado_version >= (3,):
  164. # tornado 3 has an official API for registering new defaults, yay!
  165. ioloop.IOLoop.configure(ZMQIOLoop)
  166. else:
  167. # we have to set the global instance explicitly
  168. ioloop.IOLoop._instance = IOLoop.instance()