123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597 |
- """
- Implementation of `nbio_interface.AbstractIOServices` on top of a
- selector-based I/O loop, such as tornado's and our home-grown
- select_connection's I/O loops.
- """
- import abc
- import logging
- import socket
- import threading
- from pika.adapters.utils import nbio_interface, io_services_utils
- from pika.adapters.utils.io_services_utils import (check_callback_arg,
- check_fd_arg)
- LOGGER = logging.getLogger(__name__)
- class AbstractSelectorIOLoop(object):
- """Selector-based I/O loop interface expected by
- `selector_ioloop_adapter.SelectorIOServicesAdapter`
- NOTE: this interface follows the corresponding methods and attributes
- of `tornado.ioloop.IOLoop` in order to avoid additional adapter layering
- when wrapping tornado's IOLoop.
- """
- @property
- @abc.abstractmethod
- def READ(self): # pylint: disable=C0103
- """The value of the I/O loop's READ flag; READ/WRITE/ERROR may be used
- with bitwise operators as expected.
- Implementation note: the implementations can simply replace these
- READ/WRITE/ERROR properties with class-level attributes
- """
- @property
- @abc.abstractmethod
- def WRITE(self): # pylint: disable=C0103
- """The value of the I/O loop's WRITE flag; READ/WRITE/ERROR may be used
- with bitwise operators as expected
- """
- @property
- @abc.abstractmethod
- def ERROR(self): # pylint: disable=C0103
- """The value of the I/O loop's ERROR flag; READ/WRITE/ERROR may be used
- with bitwise operators as expected
- """
- @abc.abstractmethod
- def close(self):
- """Release IOLoop's resources.
- the `close()` method is intended to be called by the application or test
- code only after `start()` returns. After calling `close()`, no other
- interaction with the closed instance of `IOLoop` should be performed.
- """
- @abc.abstractmethod
- def start(self):
- """Run the I/O loop. It will loop until requested to exit. See `stop()`.
- """
- @abc.abstractmethod
- def stop(self):
- """Request exit from the ioloop. The loop is NOT guaranteed to
- stop before this method returns.
- To invoke `stop()` safely from a thread other than this IOLoop's thread,
- call it via `add_callback_threadsafe`; e.g.,
- `ioloop.add_callback(ioloop.stop)`
- """
- @abc.abstractmethod
- def call_later(self, delay, callback):
- """Add the callback to the IOLoop timer to be called after delay seconds
- from the time of call on best-effort basis. Returns a handle to the
- timeout.
- :param float delay: The number of seconds to wait to call callback
- :param callable callback: The callback method
- :returns: handle to the created timeout that may be passed to
- `remove_timeout()`
- :rtype: object
- """
- @abc.abstractmethod
- def remove_timeout(self, timeout_handle):
- """Remove a timeout
- :param timeout_handle: Handle of timeout to remove
- """
- @abc.abstractmethod
- def add_callback(self, callback):
- """Requests a call to the given function as soon as possible in the
- context of this IOLoop's thread.
- NOTE: This is the only thread-safe method in IOLoop. All other
- manipulations of IOLoop must be performed from the IOLoop's thread.
- For example, a thread may request a call to the `stop` method of an
- ioloop that is running in a different thread via
- `ioloop.add_callback_threadsafe(ioloop.stop)`
- :param callable callback: The callback method
- """
- @abc.abstractmethod
- def add_handler(self, fd, handler, events):
- """Start watching the given file descriptor for events
- :param int fd: The file descriptor
- :param callable handler: When requested event(s) occur,
- `handler(fd, events)` will be called.
- :param int events: The event mask using READ, WRITE, ERROR.
- """
- @abc.abstractmethod
- def update_handler(self, fd, events):
- """Changes the events we watch for
- :param int fd: The file descriptor
- :param int events: The event mask using READ, WRITE, ERROR
- """
- @abc.abstractmethod
- def remove_handler(self, fd):
- """Stop watching the given file descriptor for events
- :param int fd: The file descriptor
- """
- class SelectorIOServicesAdapter(io_services_utils.SocketConnectionMixin,
- io_services_utils.StreamingConnectionMixin,
- nbio_interface.AbstractIOServices,
- nbio_interface.AbstractFileDescriptorServices):
- """Implements the
- :py:class:`.nbio_interface.AbstractIOServices` interface
- on top of selector-style native loop having the
- :py:class:`AbstractSelectorIOLoop` interface, such as
- :py:class:`pika.selection_connection.IOLoop` and :py:class:`tornado.IOLoop`.
- NOTE:
- :py:class:`.nbio_interface.AbstractFileDescriptorServices`
- interface is only required by the mixins.
- """
- def __init__(self, native_loop):
- """
- :param AbstractSelectorIOLoop native_loop: An instance compatible with
- the `AbstractSelectorIOLoop` interface, but not necessarily derived
- from it.
- """
- self._loop = native_loop
- # Active watchers: maps file descriptors to `_FileDescriptorCallbacks`
- self._watchers = dict()
- # Native loop-specific event masks of interest
- self._readable_mask = self._loop.READ
- # NOTE: tying ERROR to WRITE is particularly handy for Windows, whose
- # `select.select()` differs from Posix by reporting
- # connection-establishment failure only through exceptfds (ERROR event),
- # while the typical application workflow is to wait for the socket to
- # become writable when waiting for socket connection to be established.
- self._writable_mask = self._loop.WRITE | self._loop.ERROR
- def get_native_ioloop(self):
- """Implement
- :py:meth:`.nbio_interface.AbstractIOServices.get_native_ioloop()`.
- """
- return self._loop
- def close(self):
- """Implement :py:meth:`.nbio_interface.AbstractIOServices.close()`.
- """
- self._loop.close()
- def run(self):
- """Implement :py:meth:`.nbio_interface.AbstractIOServices.run()`.
- """
- self._loop.start()
- def stop(self):
- """Implement :py:meth:`.nbio_interface.AbstractIOServices.stop()`.
- """
- self._loop.stop()
- def add_callback_threadsafe(self, callback):
- """Implement
- :py:meth:`.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
- """
- self._loop.add_callback(callback)
- def call_later(self, delay, callback):
- """Implement :py:meth:`.nbio_interface.AbstractIOServices.call_later()`.
- """
- return _TimerHandle(self._loop.call_later(delay, callback), self._loop)
- def getaddrinfo(self,
- host,
- port,
- on_done,
- family=0,
- socktype=0,
- proto=0,
- flags=0):
- """Implement :py:meth:`.nbio_interface.AbstractIOServices.getaddrinfo()`.
- """
- return _SelectorIOLoopIOHandle(
- _AddressResolver(
- native_loop=self._loop,
- host=host,
- port=port,
- family=family,
- socktype=socktype,
- proto=proto,
- flags=flags,
- on_done=on_done).start())
- def set_reader(self, fd, on_readable):
- """Implement
- :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
- """
- LOGGER.debug('SelectorIOServicesAdapter.set_reader(%s, %r)', fd,
- on_readable)
- check_fd_arg(fd)
- check_callback_arg(on_readable, 'on_readable')
- try:
- callbacks = self._watchers[fd]
- except KeyError:
- self._loop.add_handler(fd, self._on_reader_writer_fd_events,
- self._readable_mask)
- self._watchers[fd] = _FileDescriptorCallbacks(reader=on_readable)
- LOGGER.debug('set_reader(%s, _) added handler Rd', fd)
- else:
- if callbacks.reader is None:
- assert callbacks.writer is not None
- self._loop.update_handler(
- fd, self._readable_mask | self._writable_mask)
- LOGGER.debug('set_reader(%s, _) updated handler RdWr', fd)
- else:
- LOGGER.debug('set_reader(%s, _) replacing reader', fd)
- callbacks.reader = on_readable
- def remove_reader(self, fd):
- """Implement
- :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
- """
- LOGGER.debug('SelectorIOServicesAdapter.remove_reader(%s)', fd)
- check_fd_arg(fd)
- try:
- callbacks = self._watchers[fd]
- except KeyError:
- LOGGER.debug('remove_reader(%s) neither was set', fd)
- return False
- if callbacks.reader is None:
- assert callbacks.writer is not None
- LOGGER.debug('remove_reader(%s) reader wasn\'t set Wr', fd)
- return False
- callbacks.reader = None
- if callbacks.writer is None:
- del self._watchers[fd]
- self._loop.remove_handler(fd)
- LOGGER.debug('remove_reader(%s) removed handler', fd)
- else:
- self._loop.update_handler(fd, self._writable_mask)
- LOGGER.debug('remove_reader(%s) updated handler Wr', fd)
- return True
- def set_writer(self, fd, on_writable):
- """Implement
- :py:meth:`.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
- """
- LOGGER.debug('SelectorIOServicesAdapter.set_writer(%s, %r)', fd,
- on_writable)
- check_fd_arg(fd)
- check_callback_arg(on_writable, 'on_writable')
- try:
- callbacks = self._watchers[fd]
- except KeyError:
- self._loop.add_handler(fd, self._on_reader_writer_fd_events,
- self._writable_mask)
- self._watchers[fd] = _FileDescriptorCallbacks(writer=on_writable)
- LOGGER.debug('set_writer(%s, _) added handler Wr', fd)
- else:
- if callbacks.writer is None:
- assert callbacks.reader is not None
- self._loop.update_handler(
- fd, self._readable_mask | self._writable_mask)
- LOGGER.debug('set_writer(%s, _) updated handler RdWr', fd)
- else:
- LOGGER.debug('set_writer(%s, _) replacing writer', fd)
- callbacks.writer = on_writable
- def remove_writer(self, fd):
- """Implement
- :py:meth:`.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
- """
- LOGGER.debug('SelectorIOServicesAdapter.remove_writer(%s)', fd)
- check_fd_arg(fd)
- try:
- callbacks = self._watchers[fd]
- except KeyError:
- LOGGER.debug('remove_writer(%s) neither was set.', fd)
- return False
- if callbacks.writer is None:
- assert callbacks.reader is not None
- LOGGER.debug('remove_writer(%s) writer wasn\'t set Rd', fd)
- return False
- callbacks.writer = None
- if callbacks.reader is None:
- del self._watchers[fd]
- self._loop.remove_handler(fd)
- LOGGER.debug('remove_writer(%s) removed handler', fd)
- else:
- self._loop.update_handler(fd, self._readable_mask)
- LOGGER.debug('remove_writer(%s) updated handler Rd', fd)
- return True
- def _on_reader_writer_fd_events(self, fd, events):
- """Handle indicated file descriptor events requested via `set_reader()`
- and `set_writer()`.
- :param fd: file descriptor
- :param events: event mask using native loop's READ/WRITE/ERROR. NOTE:
- depending on the underlying poller mechanism, ERROR may be indicated
- upon certain file description state even though we don't request it.
- We ignore ERROR here since `set_reader()`/`set_writer()` don't
- request for it.
- """
- callbacks = self._watchers[fd]
- if events & self._readable_mask and callbacks.reader is None:
- # NOTE: we check for consistency here ahead of the writer callback
- # because the writer callback, if any, can change the events being
- # watched
- LOGGER.warning(
- 'READ indicated on fd=%s, but reader callback is None; '
- 'events=%s', fd, bin(events))
- if events & self._writable_mask:
- if callbacks.writer is not None:
- callbacks.writer()
- else:
- LOGGER.warning(
- 'WRITE indicated on fd=%s, but writer callback is None; '
- 'events=%s', fd, bin(events))
- if events & self._readable_mask:
- if callbacks.reader is not None:
- callbacks.reader()
- else:
- # Reader callback might have been removed in the scope of writer
- # callback.
- pass
- class _FileDescriptorCallbacks(object):
- """Holds reader and writer callbacks for a file descriptor"""
- __slots__ = ('reader', 'writer')
- def __init__(self, reader=None, writer=None):
- self.reader = reader
- self.writer = writer
- class _TimerHandle(nbio_interface.AbstractTimerReference):
- """This module's adaptation of `nbio_interface.AbstractTimerReference`.
- """
- def __init__(self, handle, loop):
- """
- :param opaque handle: timer handle from the underlying loop
- implementation that may be passed to its `remove_timeout()` method
- :param AbstractSelectorIOLoop loop: the I/O loop instance that created
- the timeout.
- """
- self._handle = handle
- self._loop = loop
- def cancel(self):
- if self._loop is not None:
- self._loop.remove_timeout(self._handle)
- self._handle = None
- self._loop = None
- class _SelectorIOLoopIOHandle(nbio_interface.AbstractIOReference):
- """This module's adaptation of `nbio_interface.AbstractIOReference`
- """
- def __init__(self, subject):
- """
- :param subject: subject of the reference containing a `cancel()` method
- """
- self._cancel = subject.cancel
- def cancel(self):
- """Cancel pending operation
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- return self._cancel()
- class _AddressResolver(object):
- """Performs getaddrinfo asynchronously using a thread, then reports result
- via callback from the given I/O loop.
- NOTE: at this stage, we're using a thread per request, which may prove
- inefficient and even prohibitive if the app performs many of these
- operations concurrently.
- """
- NOT_STARTED = 0
- ACTIVE = 1
- CANCELED = 2
- COMPLETED = 3
- def __init__(self, native_loop, host, port, family, socktype, proto, flags,
- on_done):
- """
- :param AbstractSelectorIOLoop native_loop:
- :param host: `see socket.getaddrinfo()`
- :param port: `see socket.getaddrinfo()`
- :param family: `see socket.getaddrinfo()`
- :param socktype: `see socket.getaddrinfo()`
- :param proto: `see socket.getaddrinfo()`
- :param flags: `see socket.getaddrinfo()`
- :param on_done: on_done(records|BaseException) callback for reporting
- result from the given I/O loop. The single arg will be either an
- exception object (check for `BaseException`) in case of failure or
- the result returned by `socket.getaddrinfo()`.
- """
- check_callback_arg(on_done, 'on_done')
- self._state = self.NOT_STARTED
- self._result = None
- self._loop = native_loop
- self._host = host
- self._port = port
- self._family = family
- self._socktype = socktype
- self._proto = proto
- self._flags = flags
- self._on_done = on_done
- self._mutex = threading.Lock()
- self._threading_timer = None
- def _cleanup(self):
- """Release resources
- """
- self._loop = None
- self._threading_timer = None
- self._on_done = None
- def start(self):
- """Start asynchronous DNS lookup.
- :rtype: nbio_interface.AbstractIOReference
- """
- assert self._state == self.NOT_STARTED, self._state
- self._state = self.ACTIVE
- self._threading_timer = threading.Timer(0, self._resolve)
- self._threading_timer.start()
- return _SelectorIOLoopIOHandle(self)
- def cancel(self):
- """Cancel the pending resolver
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- # Try to cancel, but no guarantees
- with self._mutex:
- if self._state == self.ACTIVE:
- LOGGER.debug('Canceling resolver for %s:%s', self._host,
- self._port)
- self._state = self.CANCELED
- # Attempt to cancel, but not guaranteed
- self._threading_timer.cancel()
- self._cleanup()
- return True
- else:
- LOGGER.debug(
- 'Ignoring _AddressResolver cancel request when not ACTIVE; '
- '(%s:%s); state=%s', self._host, self._port, self._state)
- return False
- def _resolve(self):
- """Call `socket.getaddrinfo()` and return result via user's callback
- function on the given I/O loop
- """
- try:
- # NOTE: on python 2.x, can't pass keyword args to getaddrinfo()
- result = socket.getaddrinfo(self._host, self._port, self._family,
- self._socktype, self._proto,
- self._flags)
- except Exception as exc: # pylint: disable=W0703
- LOGGER.error('Address resolution failed: %r', exc)
- result = exc
- self._result = result
- # Schedule result to be returned to user via user's event loop
- with self._mutex:
- if self._state == self.ACTIVE:
- self._loop.add_callback(self._dispatch_result)
- else:
- LOGGER.debug(
- 'Asynchronous getaddrinfo cancellation detected; '
- 'in thread; host=%r', self._host)
- def _dispatch_result(self):
- """This is called from the user's I/O loop to pass the result to the
- user via the user's on_done callback
- """
- if self._state == self.ACTIVE:
- self._state = self.COMPLETED
- try:
- LOGGER.debug(
- 'Invoking asynchronous getaddrinfo() completion callback; '
- 'host=%r', self._host)
- self._on_done(self._result)
- finally:
- self._cleanup()
- else:
- LOGGER.debug(
- 'Asynchronous getaddrinfo cancellation detected; '
- 'in I/O loop context; host=%r', self._host)
|