| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278 |
- """Use pika with the Asyncio EventLoop"""
- import asyncio
- import logging
- from pika.adapters import base_connection
- from pika.adapters.utils import nbio_interface, io_services_utils
- LOGGER = logging.getLogger(__name__)
- class AsyncioConnection(base_connection.BaseConnection):
- """ The AsyncioConnection runs on the Asyncio EventLoop.
- """
- def __init__(self,
- parameters=None,
- on_open_callback=None,
- on_open_error_callback=None,
- on_close_callback=None,
- custom_ioloop=None,
- internal_connection_workflow=True):
- """ Create a new instance of the AsyncioConnection class, connecting
- to RabbitMQ automatically
- :param pika.connection.Parameters parameters: Connection parameters
- :param callable on_open_callback: The method to call when the connection
- is open
- :param None | method on_open_error_callback: Called if the connection
- can't be established or connection establishment is interrupted by
- `Connection.close()`: on_open_error_callback(Connection, exception).
- :param None | method on_close_callback: Called when a previously fully
- open connection is closed:
- `on_close_callback(Connection, exception)`, where `exception` is
- either an instance of `exceptions.ConnectionClosed` if closed by
- user or broker or exception of another type that describes the cause
- of connection failure.
- :param None | asyncio.AbstractEventLoop |
- nbio_interface.AbstractIOServices custom_ioloop:
- Defaults to asyncio.get_event_loop().
- :param bool internal_connection_workflow: True for autonomous connection
- establishment which is default; False for externally-managed
- connection workflow via the `create_connection()` factory.
- """
- if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
- nbio = custom_ioloop
- else:
- nbio = _AsyncioIOServicesAdapter(custom_ioloop)
- super().__init__(
- parameters,
- on_open_callback,
- on_open_error_callback,
- on_close_callback,
- nbio,
- internal_connection_workflow=internal_connection_workflow)
- @classmethod
- def create_connection(cls,
- connection_configs,
- on_done,
- custom_ioloop=None,
- workflow=None):
- """Implement
- :py:classmethod:`pika.adapters.BaseConnection.create_connection()`.
- """
- nbio = _AsyncioIOServicesAdapter(custom_ioloop)
- def connection_factory(params):
- """Connection factory."""
- if params is None:
- raise ValueError('Expected pika.connection.Parameters '
- 'instance, but got None in params arg.')
- return cls(
- parameters=params,
- custom_ioloop=nbio,
- internal_connection_workflow=False)
- return cls._start_connection_workflow(
- connection_configs=connection_configs,
- connection_factory=connection_factory,
- nbio=nbio,
- workflow=workflow,
- on_done=on_done)
- class _AsyncioIOServicesAdapter(io_services_utils.SocketConnectionMixin,
- io_services_utils.StreamingConnectionMixin,
- nbio_interface.AbstractIOServices,
- nbio_interface.AbstractFileDescriptorServices):
- """Implements
- :py:class:`.utils.nbio_interface.AbstractIOServices` interface
- on top of `asyncio`.
- NOTE:
- :py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
- interface is only required by the mixins.
- """
- def __init__(self, loop=None):
- """
- :param asyncio.AbstractEventLoop | None loop: If None, gets default
- event loop from asyncio.
- """
- self._loop = loop or asyncio.get_event_loop()
- def get_native_ioloop(self):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.
- """
- return self._loop
- def close(self):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.
- """
- self._loop.close()
- def run(self):
- """Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.
- """
- self._loop.run_forever()
- def stop(self):
- """Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.
- """
- self._loop.stop()
- def add_callback_threadsafe(self, callback):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
- """
- self._loop.call_soon_threadsafe(callback)
- def call_later(self, delay, callback):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.
- """
- return _TimerHandle(self._loop.call_later(delay, callback))
- def getaddrinfo(self,
- host,
- port,
- on_done,
- family=0,
- socktype=0,
- proto=0,
- flags=0):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.
- """
- return self._schedule_and_wrap_in_io_ref(
- self._loop.getaddrinfo(
- host,
- port,
- family=family,
- type=socktype,
- proto=proto,
- flags=flags), on_done)
- def set_reader(self, fd, on_readable):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
- """
- self._loop.add_reader(fd, on_readable)
- LOGGER.debug('set_reader(%s, _)', fd)
- def remove_reader(self, fd):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
- """
- LOGGER.debug('remove_reader(%s)', fd)
- return self._loop.remove_reader(fd)
- def set_writer(self, fd, on_writable):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
- """
- self._loop.add_writer(fd, on_writable)
- LOGGER.debug('set_writer(%s, _)', fd)
- def remove_writer(self, fd):
- """Implement
- :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
- """
- LOGGER.debug('remove_writer(%s)', fd)
- return self._loop.remove_writer(fd)
- def _schedule_and_wrap_in_io_ref(self, coro, on_done):
- """Schedule the coroutine to run and return _AsyncioIOReference
- :param coroutine-obj coro:
- :param callable on_done: user callback that takes the completion result
- or exception as its only arg. It will not be called if the operation
- was cancelled.
- :rtype: _AsyncioIOReference which is derived from
- nbio_interface.AbstractIOReference
- """
- if not callable(on_done):
- raise TypeError(
- 'on_done arg must be callable, but got {!r}'.format(on_done))
- return _AsyncioIOReference(
- asyncio.ensure_future(coro, loop=self._loop), on_done)
- class _TimerHandle(nbio_interface.AbstractTimerReference):
- """This module's adaptation of `nbio_interface.AbstractTimerReference`.
- """
- def __init__(self, handle):
- """
- :param asyncio.Handle handle:
- """
- self._handle = handle
- def cancel(self):
- if self._handle is not None:
- self._handle.cancel()
- self._handle = None
- class _AsyncioIOReference(nbio_interface.AbstractIOReference):
- """This module's adaptation of `nbio_interface.AbstractIOReference`.
- """
- def __init__(self, future, on_done):
- """
- :param asyncio.Future future:
- :param callable on_done: user callback that takes the completion result
- or exception as its only arg. It will not be called if the operation
- was cancelled.
- """
- if not callable(on_done):
- raise TypeError(
- 'on_done arg must be callable, but got {!r}'.format(on_done))
- self._future = future
- def on_done_adapter(future):
- """Handle completion callback from the future instance"""
- # NOTE: Asyncio schedules callback for cancelled futures, but pika
- # doesn't want that
- if not future.cancelled():
- on_done(future.exception() or future.result())
- future.add_done_callback(on_done_adapter)
- def cancel(self):
- """Cancel pending operation
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- return self._future.cancel()
|