123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299 |
- """Bridges between the `asyncio` module and Tornado IOLoop.
- .. versionadded:: 3.2
- This module integrates Tornado with the ``asyncio`` module introduced
- in Python 3.4. This makes it possible to combine the two libraries on
- the same event loop.
- .. deprecated:: 5.0
- While the code in this module is still used, it is now enabled
- automatically when `asyncio` is available, so applications should
- no longer need to refer to this module directly.
- .. note::
- Tornado requires the `~asyncio.AbstractEventLoop.add_reader` family of
- methods, so it is not compatible with the `~asyncio.ProactorEventLoop` on
- Windows. Use the `~asyncio.SelectorEventLoop` instead.
- """
- from __future__ import absolute_import, division, print_function
- import functools
- from tornado.gen import convert_yielded
- from tornado.ioloop import IOLoop
- from tornado import stack_context
- import asyncio
- class BaseAsyncIOLoop(IOLoop):
- def initialize(self, asyncio_loop, **kwargs):
- self.asyncio_loop = asyncio_loop
- # Maps fd to (fileobj, handler function) pair (as in IOLoop.add_handler)
- self.handlers = {}
- # Set of fds listening for reads/writes
- self.readers = set()
- self.writers = set()
- self.closing = False
- # If an asyncio loop was closed through an asyncio interface
- # instead of IOLoop.close(), we'd never hear about it and may
- # have left a dangling reference in our map. In case an
- # application (or, more likely, a test suite) creates and
- # destroys a lot of event loops in this way, check here to
- # ensure that we don't have a lot of dead loops building up in
- # the map.
- #
- # TODO(bdarnell): consider making self.asyncio_loop a weakref
- # for AsyncIOMainLoop and make _ioloop_for_asyncio a
- # WeakKeyDictionary.
- for loop in list(IOLoop._ioloop_for_asyncio):
- if loop.is_closed():
- del IOLoop._ioloop_for_asyncio[loop]
- IOLoop._ioloop_for_asyncio[asyncio_loop] = self
- super(BaseAsyncIOLoop, self).initialize(**kwargs)
- def close(self, all_fds=False):
- self.closing = True
- for fd in list(self.handlers):
- fileobj, handler_func = self.handlers[fd]
- self.remove_handler(fd)
- if all_fds:
- self.close_fd(fileobj)
- # Remove the mapping before closing the asyncio loop. If this
- # happened in the other order, we could race against another
- # initialize() call which would see the closed asyncio loop,
- # assume it was closed from the asyncio side, and do this
- # cleanup for us, leading to a KeyError.
- del IOLoop._ioloop_for_asyncio[self.asyncio_loop]
- self.asyncio_loop.close()
- def add_handler(self, fd, handler, events):
- fd, fileobj = self.split_fd(fd)
- if fd in self.handlers:
- raise ValueError("fd %s added twice" % fd)
- self.handlers[fd] = (fileobj, stack_context.wrap(handler))
- if events & IOLoop.READ:
- self.asyncio_loop.add_reader(
- fd, self._handle_events, fd, IOLoop.READ)
- self.readers.add(fd)
- if events & IOLoop.WRITE:
- self.asyncio_loop.add_writer(
- fd, self._handle_events, fd, IOLoop.WRITE)
- self.writers.add(fd)
- def update_handler(self, fd, events):
- fd, fileobj = self.split_fd(fd)
- if events & IOLoop.READ:
- if fd not in self.readers:
- self.asyncio_loop.add_reader(
- fd, self._handle_events, fd, IOLoop.READ)
- self.readers.add(fd)
- else:
- if fd in self.readers:
- self.asyncio_loop.remove_reader(fd)
- self.readers.remove(fd)
- if events & IOLoop.WRITE:
- if fd not in self.writers:
- self.asyncio_loop.add_writer(
- fd, self._handle_events, fd, IOLoop.WRITE)
- self.writers.add(fd)
- else:
- if fd in self.writers:
- self.asyncio_loop.remove_writer(fd)
- self.writers.remove(fd)
- def remove_handler(self, fd):
- fd, fileobj = self.split_fd(fd)
- if fd not in self.handlers:
- return
- if fd in self.readers:
- self.asyncio_loop.remove_reader(fd)
- self.readers.remove(fd)
- if fd in self.writers:
- self.asyncio_loop.remove_writer(fd)
- self.writers.remove(fd)
- del self.handlers[fd]
- def _handle_events(self, fd, events):
- fileobj, handler_func = self.handlers[fd]
- handler_func(fileobj, events)
- def start(self):
- try:
- old_loop = asyncio.get_event_loop()
- except (RuntimeError, AssertionError):
- old_loop = None
- try:
- self._setup_logging()
- asyncio.set_event_loop(self.asyncio_loop)
- self.asyncio_loop.run_forever()
- finally:
- asyncio.set_event_loop(old_loop)
- def stop(self):
- self.asyncio_loop.stop()
- def call_at(self, when, callback, *args, **kwargs):
- # asyncio.call_at supports *args but not **kwargs, so bind them here.
- # We do not synchronize self.time and asyncio_loop.time, so
- # convert from absolute to relative.
- return self.asyncio_loop.call_later(
- max(0, when - self.time()), self._run_callback,
- functools.partial(stack_context.wrap(callback), *args, **kwargs))
- def remove_timeout(self, timeout):
- timeout.cancel()
- def add_callback(self, callback, *args, **kwargs):
- try:
- self.asyncio_loop.call_soon_threadsafe(
- self._run_callback,
- functools.partial(stack_context.wrap(callback), *args, **kwargs))
- except RuntimeError:
- # "Event loop is closed". Swallow the exception for
- # consistency with PollIOLoop (and logical consistency
- # with the fact that we can't guarantee that an
- # add_callback that completes without error will
- # eventually execute).
- pass
- add_callback_from_signal = add_callback
- def run_in_executor(self, executor, func, *args):
- return self.asyncio_loop.run_in_executor(executor, func, *args)
- def set_default_executor(self, executor):
- return self.asyncio_loop.set_default_executor(executor)
- class AsyncIOMainLoop(BaseAsyncIOLoop):
- """``AsyncIOMainLoop`` creates an `.IOLoop` that corresponds to the
- current ``asyncio`` event loop (i.e. the one returned by
- ``asyncio.get_event_loop()``).
- .. deprecated:: 5.0
- Now used automatically when appropriate; it is no longer necessary
- to refer to this class directly.
- .. versionchanged:: 5.0
- Closing an `AsyncIOMainLoop` now closes the underlying asyncio loop.
- """
- def initialize(self, **kwargs):
- super(AsyncIOMainLoop, self).initialize(asyncio.get_event_loop(), **kwargs)
- def make_current(self):
- # AsyncIOMainLoop already refers to the current asyncio loop so
- # nothing to do here.
- pass
- class AsyncIOLoop(BaseAsyncIOLoop):
- """``AsyncIOLoop`` is an `.IOLoop` that runs on an ``asyncio`` event loop.
- This class follows the usual Tornado semantics for creating new
- ``IOLoops``; these loops are not necessarily related to the
- ``asyncio`` default event loop.
- Each ``AsyncIOLoop`` creates a new ``asyncio.EventLoop``; this object
- can be accessed with the ``asyncio_loop`` attribute.
- .. versionchanged:: 5.0
- When an ``AsyncIOLoop`` becomes the current `.IOLoop`, it also sets
- the current `asyncio` event loop.
- .. deprecated:: 5.0
- Now used automatically when appropriate; it is no longer necessary
- to refer to this class directly.
- """
- def initialize(self, **kwargs):
- self.is_current = False
- loop = asyncio.new_event_loop()
- try:
- super(AsyncIOLoop, self).initialize(loop, **kwargs)
- except Exception:
- # If initialize() does not succeed (taking ownership of the loop),
- # we have to close it.
- loop.close()
- raise
- def close(self, all_fds=False):
- if self.is_current:
- self.clear_current()
- super(AsyncIOLoop, self).close(all_fds=all_fds)
- def make_current(self):
- if not self.is_current:
- try:
- self.old_asyncio = asyncio.get_event_loop()
- except (RuntimeError, AssertionError):
- self.old_asyncio = None
- self.is_current = True
- asyncio.set_event_loop(self.asyncio_loop)
- def _clear_current_hook(self):
- if self.is_current:
- asyncio.set_event_loop(self.old_asyncio)
- self.is_current = False
- def to_tornado_future(asyncio_future):
- """Convert an `asyncio.Future` to a `tornado.concurrent.Future`.
- .. versionadded:: 4.1
- .. deprecated:: 5.0
- Tornado ``Futures`` have been merged with `asyncio.Future`,
- so this method is now a no-op.
- """
- return asyncio_future
- def to_asyncio_future(tornado_future):
- """Convert a Tornado yieldable object to an `asyncio.Future`.
- .. versionadded:: 4.1
- .. versionchanged:: 4.3
- Now accepts any yieldable object, not just
- `tornado.concurrent.Future`.
- .. deprecated:: 5.0
- Tornado ``Futures`` have been merged with `asyncio.Future`,
- so this method is now equivalent to `tornado.gen.convert_yielded`.
- """
- return convert_yielded(tornado_future)
- class AnyThreadEventLoopPolicy(asyncio.DefaultEventLoopPolicy):
- """Event loop policy that allows loop creation on any thread.
- The default `asyncio` event loop policy only automatically creates
- event loops in the main threads. Other threads must create event
- loops explicitly or `asyncio.get_event_loop` (and therefore
- `.IOLoop.current`) will fail. Installing this policy allows event
- loops to be created automatically on any thread, matching the
- behavior of Tornado versions prior to 5.0 (or 5.0 on Python 2).
- Usage::
- asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
- .. versionadded:: 5.0
- """
- def get_event_loop(self):
- try:
- return super().get_event_loop()
- except (RuntimeError, AssertionError):
- # This was an AssertionError in python 3.4.2 (which ships with debian jessie)
- # and changed to a RuntimeError in 3.4.3.
- # "There is no current event loop in thread %r"
- loop = self.new_event_loop()
- self.set_event_loop(loop)
- return loop
|