tornado_connection.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. """Use pika with the Tornado IOLoop
  2. """
  3. import logging
  4. from tornado import ioloop
  5. from pika.adapters.utils import nbio_interface, selector_ioloop_adapter
  6. from pika.adapters import base_connection
  7. LOGGER = logging.getLogger(__name__)
  8. class TornadoConnection(base_connection.BaseConnection):
  9. """The TornadoConnection runs on the Tornado IOLoop.
  10. """
  11. def __init__(self,
  12. parameters=None,
  13. on_open_callback=None,
  14. on_open_error_callback=None,
  15. on_close_callback=None,
  16. custom_ioloop=None,
  17. internal_connection_workflow=True):
  18. """Create a new instance of the TornadoConnection class, connecting
  19. to RabbitMQ automatically.
  20. :param pika.connection.Parameters|None parameters: The connection
  21. parameters
  22. :param callable|None on_open_callback: The method to call when the
  23. connection is open
  24. :param callable|None on_open_error_callback: Called if the connection
  25. can't be established or connection establishment is interrupted by
  26. `Connection.close()`:
  27. on_open_error_callback(Connection, exception)
  28. :param callable|None on_close_callback: Called when a previously fully
  29. open connection is closed:
  30. `on_close_callback(Connection, exception)`, where `exception` is
  31. either an instance of `exceptions.ConnectionClosed` if closed by
  32. user or broker or exception of another type that describes the
  33. cause of connection failure
  34. :param ioloop.IOLoop|nbio_interface.AbstractIOServices|None custom_ioloop:
  35. Override using the global IOLoop in Tornado
  36. :param bool internal_connection_workflow: True for autonomous connection
  37. establishment which is default; False for externally-managed
  38. connection workflow via the `create_connection()` factory
  39. """
  40. if isinstance(custom_ioloop, nbio_interface.AbstractIOServices):
  41. nbio = custom_ioloop
  42. else:
  43. nbio = (selector_ioloop_adapter.SelectorIOServicesAdapter(
  44. custom_ioloop or ioloop.IOLoop.instance()))
  45. super(TornadoConnection, self).__init__(
  46. parameters,
  47. on_open_callback,
  48. on_open_error_callback,
  49. on_close_callback,
  50. nbio,
  51. internal_connection_workflow=internal_connection_workflow)
  52. @classmethod
  53. def create_connection(cls,
  54. connection_configs,
  55. on_done,
  56. custom_ioloop=None,
  57. workflow=None):
  58. """Implement
  59. :py:classmethod:`pika.adapters.BaseConnection.create_connection()`.
  60. """
  61. nbio = selector_ioloop_adapter.SelectorIOServicesAdapter(
  62. custom_ioloop or ioloop.IOLoop.instance())
  63. def connection_factory(params):
  64. """Connection factory."""
  65. if params is None:
  66. raise ValueError('Expected pika.connection.Parameters '
  67. 'instance, but got None in params arg.')
  68. return cls(
  69. parameters=params,
  70. custom_ioloop=nbio,
  71. internal_connection_workflow=False)
  72. return cls._start_connection_workflow(
  73. connection_configs=connection_configs,
  74. connection_factory=connection_factory,
  75. nbio=nbio,
  76. workflow=workflow,
  77. on_done=on_done)