asyncio_connection.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278
  1. """Use pika with the Asyncio EventLoop"""
  2. import asyncio
  3. import logging
  4. from pika.adapters import base_connection
  5. from pika.adapters.utils import nbio_interface, io_services_utils
  6. LOGGER = logging.getLogger(__name__)
  7. class AsyncioConnection(base_connection.BaseConnection):
  8. """ The AsyncioConnection runs on the Asyncio EventLoop.
  9. """
  10. def __init__(self,
  11. parameters=None,
  12. on_open_callback=None,
  13. on_open_error_callback=None,
  14. on_close_callback=None,
  15. custom_ioloop=None,
  16. internal_connection_workflow=True):
  17. """ Create a new instance of the AsyncioConnection class, connecting
  18. to RabbitMQ automatically
  19. :param pika.connection.Parameters parameters: Connection parameters
  20. :param callable on_open_callback: The method to call when the connection
  21. is open
  22. :param None | method on_open_error_callback: Called if the connection
  23. can't be established or connection establishment is interrupted by
  24. `Connection.close()`: on_open_error_callback(Connection, exception).
  25. :param None | method on_close_callback: Called when a previously fully
  26. open connection is closed:
  27. `on_close_callback(Connection, exception)`, where `exception` is
  28. either an instance of `exceptions.ConnectionClosed` if closed by
  29. user or broker or exception of another type that describes the cause
  30. of connection failure.
  31. :param None | asyncio.AbstractEventLoop |
  32. nbio_interface.AbstractIOServices custom_ioloop:
  33. Defaults to asyncio.get_event_loop().
  34. :param bool internal_connection_workflow: True for autonomous connection
  35. establishment which is default; False for externally-managed
  36. connection workflow via the `create_connection()` factory.
  37. """
  38. if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
  39. nbio = custom_ioloop
  40. else:
  41. nbio = _AsyncioIOServicesAdapter(custom_ioloop)
  42. super().__init__(
  43. parameters,
  44. on_open_callback,
  45. on_open_error_callback,
  46. on_close_callback,
  47. nbio,
  48. internal_connection_workflow=internal_connection_workflow)
  49. @classmethod
  50. def create_connection(cls,
  51. connection_configs,
  52. on_done,
  53. custom_ioloop=None,
  54. workflow=None):
  55. """Implement
  56. :py:classmethod:`pika.adapters.BaseConnection.create_connection()`.
  57. """
  58. nbio = _AsyncioIOServicesAdapter(custom_ioloop)
  59. def connection_factory(params):
  60. """Connection factory."""
  61. if params is None:
  62. raise ValueError('Expected pika.connection.Parameters '
  63. 'instance, but got None in params arg.')
  64. return cls(
  65. parameters=params,
  66. custom_ioloop=nbio,
  67. internal_connection_workflow=False)
  68. return cls._start_connection_workflow(
  69. connection_configs=connection_configs,
  70. connection_factory=connection_factory,
  71. nbio=nbio,
  72. workflow=workflow,
  73. on_done=on_done)
  74. class _AsyncioIOServicesAdapter(io_services_utils.SocketConnectionMixin,
  75. io_services_utils.StreamingConnectionMixin,
  76. nbio_interface.AbstractIOServices,
  77. nbio_interface.AbstractFileDescriptorServices):
  78. """Implements
  79. :py:class:`.utils.nbio_interface.AbstractIOServices` interface
  80. on top of `asyncio`.
  81. NOTE:
  82. :py:class:`.utils.nbio_interface.AbstractFileDescriptorServices`
  83. interface is only required by the mixins.
  84. """
  85. def __init__(self, loop=None):
  86. """
  87. :param asyncio.AbstractEventLoop | None loop: If None, gets default
  88. event loop from asyncio.
  89. """
  90. self._loop = loop or asyncio.get_event_loop()
  91. def get_native_ioloop(self):
  92. """Implement
  93. :py:meth:`.utils.nbio_interface.AbstractIOServices.get_native_ioloop()`.
  94. """
  95. return self._loop
  96. def close(self):
  97. """Implement
  98. :py:meth:`.utils.nbio_interface.AbstractIOServices.close()`.
  99. """
  100. self._loop.close()
  101. def run(self):
  102. """Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.run()`.
  103. """
  104. self._loop.run_forever()
  105. def stop(self):
  106. """Implement :py:meth:`.utils.nbio_interface.AbstractIOServices.stop()`.
  107. """
  108. self._loop.stop()
  109. def add_callback_threadsafe(self, callback):
  110. """Implement
  111. :py:meth:`.utils.nbio_interface.AbstractIOServices.add_callback_threadsafe()`.
  112. """
  113. self._loop.call_soon_threadsafe(callback)
  114. def call_later(self, delay, callback):
  115. """Implement
  116. :py:meth:`.utils.nbio_interface.AbstractIOServices.call_later()`.
  117. """
  118. return _TimerHandle(self._loop.call_later(delay, callback))
  119. def getaddrinfo(self,
  120. host,
  121. port,
  122. on_done,
  123. family=0,
  124. socktype=0,
  125. proto=0,
  126. flags=0):
  127. """Implement
  128. :py:meth:`.utils.nbio_interface.AbstractIOServices.getaddrinfo()`.
  129. """
  130. return self._schedule_and_wrap_in_io_ref(
  131. self._loop.getaddrinfo(
  132. host,
  133. port,
  134. family=family,
  135. type=socktype,
  136. proto=proto,
  137. flags=flags), on_done)
  138. def set_reader(self, fd, on_readable):
  139. """Implement
  140. :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_reader()`.
  141. """
  142. self._loop.add_reader(fd, on_readable)
  143. LOGGER.debug('set_reader(%s, _)', fd)
  144. def remove_reader(self, fd):
  145. """Implement
  146. :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_reader()`.
  147. """
  148. LOGGER.debug('remove_reader(%s)', fd)
  149. return self._loop.remove_reader(fd)
  150. def set_writer(self, fd, on_writable):
  151. """Implement
  152. :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.set_writer()`.
  153. """
  154. self._loop.add_writer(fd, on_writable)
  155. LOGGER.debug('set_writer(%s, _)', fd)
  156. def remove_writer(self, fd):
  157. """Implement
  158. :py:meth:`.utils.nbio_interface.AbstractFileDescriptorServices.remove_writer()`.
  159. """
  160. LOGGER.debug('remove_writer(%s)', fd)
  161. return self._loop.remove_writer(fd)
  162. def _schedule_and_wrap_in_io_ref(self, coro, on_done):
  163. """Schedule the coroutine to run and return _AsyncioIOReference
  164. :param coroutine-obj coro:
  165. :param callable on_done: user callback that takes the completion result
  166. or exception as its only arg. It will not be called if the operation
  167. was cancelled.
  168. :rtype: _AsyncioIOReference which is derived from
  169. nbio_interface.AbstractIOReference
  170. """
  171. if not callable(on_done):
  172. raise TypeError(
  173. 'on_done arg must be callable, but got {!r}'.format(on_done))
  174. return _AsyncioIOReference(
  175. asyncio.ensure_future(coro, loop=self._loop), on_done)
  176. class _TimerHandle(nbio_interface.AbstractTimerReference):
  177. """This module's adaptation of `nbio_interface.AbstractTimerReference`.
  178. """
  179. def __init__(self, handle):
  180. """
  181. :param asyncio.Handle handle:
  182. """
  183. self._handle = handle
  184. def cancel(self):
  185. if self._handle is not None:
  186. self._handle.cancel()
  187. self._handle = None
  188. class _AsyncioIOReference(nbio_interface.AbstractIOReference):
  189. """This module's adaptation of `nbio_interface.AbstractIOReference`.
  190. """
  191. def __init__(self, future, on_done):
  192. """
  193. :param asyncio.Future future:
  194. :param callable on_done: user callback that takes the completion result
  195. or exception as its only arg. It will not be called if the operation
  196. was cancelled.
  197. """
  198. if not callable(on_done):
  199. raise TypeError(
  200. 'on_done arg must be callable, but got {!r}'.format(on_done))
  201. self._future = future
  202. def on_done_adapter(future):
  203. """Handle completion callback from the future instance"""
  204. # NOTE: Asyncio schedules callback for cancelled futures, but pika
  205. # doesn't want that
  206. if not future.cancelled():
  207. on_done(future.exception() or future.result())
  208. future.add_done_callback(on_done_adapter)
  209. def cancel(self):
  210. """Cancel pending operation
  211. :returns: False if was already done or cancelled; True otherwise
  212. :rtype: bool
  213. """
  214. return self._future.cancel()