asyncio.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299
  1. """Bridges between the `asyncio` module and Tornado IOLoop.
  2. .. versionadded:: 3.2
  3. This module integrates Tornado with the ``asyncio`` module introduced
  4. in Python 3.4. This makes it possible to combine the two libraries on
  5. the same event loop.
  6. .. deprecated:: 5.0
  7. While the code in this module is still used, it is now enabled
  8. automatically when `asyncio` is available, so applications should
  9. no longer need to refer to this module directly.
  10. .. note::
  11. Tornado requires the `~asyncio.AbstractEventLoop.add_reader` family of
  12. methods, so it is not compatible with the `~asyncio.ProactorEventLoop` on
  13. Windows. Use the `~asyncio.SelectorEventLoop` instead.
  14. """
  15. from __future__ import absolute_import, division, print_function
  16. import functools
  17. from tornado.gen import convert_yielded
  18. from tornado.ioloop import IOLoop
  19. from tornado import stack_context
  20. import asyncio
  21. class BaseAsyncIOLoop(IOLoop):
  22. def initialize(self, asyncio_loop, **kwargs):
  23. self.asyncio_loop = asyncio_loop
  24. # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
  25. self.handlers = {}
  26. # Set of fds listening for reads/writes
  27. self.readers = set()
  28. self.writers = set()
  29. self.closing = False
  30. # If an asyncio loop was closed through an asyncio interface
  31. # instead of IOLoop.close(), we'd never hear about it and may
  32. # have left a dangling reference in our map. In case an
  33. # application (or, more likely, a test suite) creates and
  34. # destroys a lot of event loops in this way, check here to
  35. # ensure that we don't have a lot of dead loops building up in
  36. # the map.
  37. #
  38. # TODO(bdarnell): consider making self.asyncio_loop a weakref
  39. # for AsyncIOMainLoop and make _ioloop_for_asyncio a
  40. # WeakKeyDictionary.
  41. for loop in list(IOLoop._ioloop_for_asyncio):
  42. if loop.is_closed():
  43. del IOLoop._ioloop_for_asyncio[loop]
  44. IOLoop._ioloop_for_asyncio[asyncio_loop] = self
  45. super(BaseAsyncIOLoop, self).initialize(**kwargs)
  46. def close(self, all_fds=False):
  47. self.closing = True
  48. for fd in list(self.handlers):
  49. fileobj, handler_func = self.handlers[fd]
  50. self.remove_handler(fd)
  51. if all_fds:
  52. self.close_fd(fileobj)
  53. # Remove the mapping before closing the asyncio loop. If this
  54. # happened in the other order, we could race against another
  55. # initialize() call which would see the closed asyncio loop,
  56. # assume it was closed from the asyncio side, and do this
  57. # cleanup for us, leading to a KeyError.
  58. del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
  59. self.asyncio_loop.close()
  60. def add_handler(self, fd, handler, events):
  61. fd, fileobj = self.split_fd(fd)
  62. if fd in self.handlers:
  63. raise ValueError("fd %s added twice" % fd)
  64. self.handlers[fd] = (fileobj, stack_context.wrap(handler))
  65. if events & IOLoop.READ:
  66. self.asyncio_loop.add_reader(
  67. fd, self._handle_events, fd, IOLoop.READ)
  68. self.readers.add(fd)
  69. if events & IOLoop.WRITE:
  70. self.asyncio_loop.add_writer(
  71. fd, self._handle_events, fd, IOLoop.WRITE)
  72. self.writers.add(fd)
  73. def update_handler(self, fd, events):
  74. fd, fileobj = self.split_fd(fd)
  75. if events & IOLoop.READ:
  76. if fd not in self.readers:
  77. self.asyncio_loop.add_reader(
  78. fd, self._handle_events, fd, IOLoop.READ)
  79. self.readers.add(fd)
  80. else:
  81. if fd in self.readers:
  82. self.asyncio_loop.remove_reader(fd)
  83. self.readers.remove(fd)
  84. if events & IOLoop.WRITE:
  85. if fd not in self.writers:
  86. self.asyncio_loop.add_writer(
  87. fd, self._handle_events, fd, IOLoop.WRITE)
  88. self.writers.add(fd)
  89. else:
  90. if fd in self.writers:
  91. self.asyncio_loop.remove_writer(fd)
  92. self.writers.remove(fd)
  93. def remove_handler(self, fd):
  94. fd, fileobj = self.split_fd(fd)
  95. if fd not in self.handlers:
  96. return
  97. if fd in self.readers:
  98. self.asyncio_loop.remove_reader(fd)
  99. self.readers.remove(fd)
  100. if fd in self.writers:
  101. self.asyncio_loop.remove_writer(fd)
  102. self.writers.remove(fd)
  103. del self.handlers[fd]
  104. def _handle_events(self, fd, events):
  105. fileobj, handler_func = self.handlers[fd]
  106. handler_func(fileobj, events)
  107. def start(self):
  108. try:
  109. old_loop = asyncio.get_event_loop()
  110. except (RuntimeError, AssertionError):
  111. old_loop = None
  112. try:
  113. self._setup_logging()
  114. asyncio.set_event_loop(self.asyncio_loop)
  115. self.asyncio_loop.run_forever()
  116. finally:
  117. asyncio.set_event_loop(old_loop)
  118. def stop(self):
  119. self.asyncio_loop.stop()
  120. def call_at(self, when, callback, *args, **kwargs):
  121. # asyncio.call_at supports *args but not **kwargs, so bind them here.
  122. # We do not synchronize self.time and asyncio_loop.time, so
  123. # convert from absolute to relative.
  124. return self.asyncio_loop.call_later(
  125. max(0, when - self.time()), self._run_callback,
  126. functools.partial(stack_context.wrap(callback), *args, **kwargs))
  127. def remove_timeout(self, timeout):
  128. timeout.cancel()
  129. def add_callback(self, callback, *args, **kwargs):
  130. try:
  131. self.asyncio_loop.call_soon_threadsafe(
  132. self._run_callback,
  133. functools.partial(stack_context.wrap(callback), *args, **kwargs))
  134. except RuntimeError:
  135. # "Event loop is closed". Swallow the exception for
  136. # consistency with PollIOLoop (and logical consistency
  137. # with the fact that we can't guarantee that an
  138. # add_callback that completes without error will
  139. # eventually execute).
  140. pass
  141. add_callback_from_signal = add_callback
  142. def run_in_executor(self, executor, func, *args):
  143. return self.asyncio_loop.run_in_executor(executor, func, *args)
  144. def set_default_executor(self, executor):
  145. return self.asyncio_loop.set_default_executor(executor)
  146. class AsyncIOMainLoop(BaseAsyncIOLoop):
  147. """``AsyncIOMainLoop`` creates an `.IOLoop` that corresponds to the
  148. current ``asyncio`` event loop (i.e. the one returned by
  149. ``asyncio.get_event_loop()``).
  150. .. deprecated:: 5.0
  151. Now used automatically when appropriate; it is no longer necessary
  152. to refer to this class directly.
  153. .. versionchanged:: 5.0
  154. Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop.
  155. """
  156. def initialize(self, **kwargs):
  157. super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)
  158. def make_current(self):
  159. # AsyncIOMainLoop already refers to the current asyncio loop so
  160. # nothing to do here.
  161. pass
  162. class AsyncIOLoop(BaseAsyncIOLoop):
  163. """``AsyncIOLoop`` is an `.IOLoop` that runs on an ``asyncio`` event loop.
  164. This class follows the usual Tornado semantics for creating new
  165. ``IOLoops``; these loops are not necessarily related to the
  166. ``asyncio`` default event loop.
  167. Each ``AsyncIOLoop`` creates a new ``asyncio.EventLoop``; this object
  168. can be accessed with the ``asyncio_loop`` attribute.
  169. .. versionchanged:: 5.0
  170. When an ``AsyncIOLoop`` becomes the current `.IOLoop`, it also sets
  171. the current `asyncio` event loop.
  172. .. deprecated:: 5.0
  173. Now used automatically when appropriate; it is no longer necessary
  174. to refer to this class directly.
  175. """
  176. def initialize(self, **kwargs):
  177. self.is_current = False
  178. loop = asyncio.new_event_loop()
  179. try:
  180. super(AsyncIOLoop, self).initialize(loop, **kwargs)
  181. except Exception:
  182. # If initialize() does not succeed (taking ownership of the loop),
  183. # we have to close it.
  184. loop.close()
  185. raise
  186. def close(self, all_fds=False):
  187. if self.is_current:
  188. self.clear_current()
  189. super(AsyncIOLoop, self).close(all_fds=all_fds)
  190. def make_current(self):
  191. if not self.is_current:
  192. try:
  193. self.old_asyncio = asyncio.get_event_loop()
  194. except (RuntimeError, AssertionError):
  195. self.old_asyncio = None
  196. self.is_current = True
  197. asyncio.set_event_loop(self.asyncio_loop)
  198. def _clear_current_hook(self):
  199. if self.is_current:
  200. asyncio.set_event_loop(self.old_asyncio)
  201. self.is_current = False
  202. def to_tornado_future(asyncio_future):
  203. """Convert an `asyncio.Future` to a `tornado.concurrent.Future`.
  204. .. versionadded:: 4.1
  205. .. deprecated:: 5.0
  206. Tornado ``Futures`` have been merged with `asyncio.Future`,
  207. so this method is now a no-op.
  208. """
  209. return asyncio_future
  210. def to_asyncio_future(tornado_future):
  211. """Convert a Tornado yieldable object to an `asyncio.Future`.
  212. .. versionadded:: 4.1
  213. .. versionchanged:: 4.3
  214. Now accepts any yieldable object, not just
  215. `tornado.concurrent.Future`.
  216. .. deprecated:: 5.0
  217. Tornado ``Futures`` have been merged with `asyncio.Future`,
  218. so this method is now equivalent to `tornado.gen.convert_yielded`.
  219. """
  220. return convert_yielded(tornado_future)
  221. class AnyThreadEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
  222. """Event loop policy that allows loop creation on any thread.
  223. The default `asyncio` event loop policy only automatically creates
  224. event loops in the main threads. Other threads must create event
  225. loops explicitly or `asyncio.get_event_loop` (and therefore
  226. `.IOLoop.current`) will fail. Installing this policy allows event
  227. loops to be created automatically on any thread, matching the
  228. behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).
  229. Usage::
  230. asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
  231. .. versionadded:: 5.0
  232. """
  233. def get_event_loop(self):
  234. try:
  235. return super().get_event_loop()
  236. except (RuntimeError, AssertionError):
  237. # This was an AssertionError in python 3.4.2 (which ships with debian jessie)
  238. # and changed to a RuntimeError in 3.4.3.
  239. # "There is no current event loop in thread %r"
  240. loop = self.new_event_loop()
  241. self.set_event_loop(loop)
  242. return loop