base_connection.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501
  1. """Base class extended by connection adapters. This extends the
  2. connection.Connection class to encapsulate connection behavior but still
  3. isolate socket and low level communication.
  4. """
  5. import abc
  6. import functools
  7. import logging
  8. import pika.compat
  9. import pika.exceptions
  10. import pika.tcp_socket_opts
  11. from pika.adapters.utils import connection_workflow, nbio_interface
  12. from pika import connection
  13. LOGGER = logging.getLogger(__name__)
  14. class BaseConnection(connection.Connection):
  15. """BaseConnection class that should be extended by connection adapters.
  16. This class abstracts I/O loop and transport services from pika core.
  17. """
  18. def __init__(self, parameters, on_open_callback, on_open_error_callback,
  19. on_close_callback, nbio, internal_connection_workflow):
  20. """Create a new instance of the Connection object.
  21. :param None|pika.connection.Parameters parameters: Connection parameters
  22. :param None|method on_open_callback: Method to call on connection open
  23. :param None | method on_open_error_callback: Called if the connection
  24. can't be established or connection establishment is interrupted by
  25. `Connection.close()`: on_open_error_callback(Connection, exception).
  26. :param None | method on_close_callback: Called when a previously fully
  27. open connection is closed:
  28. `on_close_callback(Connection, exception)`, where `exception` is
  29. either an instance of `exceptions.ConnectionClosed` if closed by
  30. user or broker or exception of another type that describes the cause
  31. of connection failure.
  32. :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
  33. asynchronous services
  34. :param bool internal_connection_workflow: True for autonomous connection
  35. establishment which is default; False for externally-managed
  36. connection workflow via the `create_connection()` factory.
  37. :raises: RuntimeError
  38. :raises: ValueError
  39. """
  40. if parameters and not isinstance(parameters, connection.Parameters):
  41. raise ValueError(
  42. 'Expected instance of Parameters, not %r' % (parameters,))
  43. self._nbio = nbio
  44. self._connection_workflow = None # type: connection_workflow.AMQPConnectionWorkflow
  45. self._transport = None # type: pika.adapters.utils.nbio_interface.AbstractStreamTransport
  46. self._got_eof = False # transport indicated EOF (connection reset)
  47. super(BaseConnection, self).__init__(
  48. parameters,
  49. on_open_callback,
  50. on_open_error_callback,
  51. on_close_callback,
  52. internal_connection_workflow=internal_connection_workflow)
  53. def _init_connection_state(self):
  54. """Initialize or reset all of our internal state variables for a given
  55. connection. If we disconnect and reconnect, all of our state needs to
  56. be wiped.
  57. """
  58. super(BaseConnection, self)._init_connection_state()
  59. self._connection_workflow = None
  60. self._transport = None
  61. self._got_eof = False
  62. def __repr__(self):
  63. # def get_socket_repr(sock):
  64. # """Return socket info suitable for use in repr"""
  65. # if sock is None:
  66. # return None
  67. #
  68. # sockname = None
  69. # peername = None
  70. # try:
  71. # sockname = sock.getsockname()
  72. # except pika.compat.SOCKET_ERROR:
  73. # # closed?
  74. # pass
  75. # else:
  76. # try:
  77. # peername = sock.getpeername()
  78. # except pika.compat.SOCKET_ERROR:
  79. # # not connected?
  80. # pass
  81. #
  82. # return '%s->%s' % (sockname, peername)
  83. # TODO need helpful __repr__ in transports
  84. return ('<%s %s transport=%s params=%s>' % (
  85. self.__class__.__name__, self._STATE_NAMES[self.connection_state],
  86. self._transport, self.params))
  87. @classmethod
  88. @abc.abstractmethod
  89. def create_connection(cls,
  90. connection_configs,
  91. on_done,
  92. custom_ioloop=None,
  93. workflow=None):
  94. """Asynchronously create a connection to an AMQP broker using the given
  95. configurations. Will attempt to connect using each config in the given
  96. order, including all compatible resolved IP addresses of the hostname
  97. supplied in each config, until one is established or all attempts fail.
  98. See also `_start_connection_workflow()`.
  99. :param sequence connection_configs: A sequence of one or more
  100. `pika.connection.Parameters`-based objects.
  101. :param callable on_done: as defined in
  102. `connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
  103. :param object | None custom_ioloop: Provide a custom I/O loop that is
  104. native to the specific adapter implementation; if None, the adapter
  105. will use a default loop instance, which is typically a singleton.
  106. :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
  107. Pass an instance of an implementation of the
  108. `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
  109. defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
  110. with default values for optional args.
  111. :returns: Connection workflow instance in use. The user should limit
  112. their interaction with this object only to it's `abort()` method.
  113. :rtype: connection_workflow.AbstractAMQPConnectionWorkflow
  114. """
  115. raise NotImplementedError
  116. @classmethod
  117. def _start_connection_workflow(cls, connection_configs, connection_factory,
  118. nbio, workflow, on_done):
  119. """Helper function for custom implementations of `create_connection()`.
  120. :param sequence connection_configs: A sequence of one or more
  121. `pika.connection.Parameters`-based objects.
  122. :param callable connection_factory: A function that takes
  123. `pika.connection.Parameters` as its only arg and returns a brand new
  124. `pika.connection.Connection`-based adapter instance each time it is
  125. called. The factory must instantiate the connection with
  126. `internal_connection_workflow=False`.
  127. :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
  128. :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
  129. Pass an instance of an implementation of the
  130. `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
  131. defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
  132. with default values for optional args.
  133. :param callable on_done: as defined in
  134. :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
  135. :returns: Connection workflow instance in use. The user should limit
  136. their interaction with this object only to it's `abort()` method.
  137. :rtype: connection_workflow.AbstractAMQPConnectionWorkflow
  138. """
  139. if workflow is None:
  140. workflow = connection_workflow.AMQPConnectionWorkflow()
  141. LOGGER.debug('Created default connection workflow %r', workflow)
  142. if isinstance(workflow, connection_workflow.AMQPConnectionWorkflow):
  143. workflow.set_io_services(nbio)
  144. def create_connector():
  145. """`AMQPConnector` factory."""
  146. return connection_workflow.AMQPConnector(
  147. lambda params: _StreamingProtocolShim(
  148. connection_factory(params)),
  149. nbio)
  150. workflow.start(
  151. connection_configs=connection_configs,
  152. connector_factory=create_connector,
  153. native_loop=nbio.get_native_ioloop(),
  154. on_done=functools.partial(cls._unshim_connection_workflow_callback,
  155. on_done))
  156. return workflow
  157. @property
  158. def ioloop(self):
  159. """
  160. :returns: the native I/O loop instance underlying async services selected
  161. by user or the default selected by the specialized connection
  162. adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
  163. `select_connection.IOLoop`, etc.)
  164. :rtype: object
  165. """
  166. return self._nbio.get_native_ioloop()
  167. def _adapter_call_later(self, delay, callback):
  168. """Implement
  169. :py:meth:`pika.connection.Connection._adapter_call_later()`.
  170. """
  171. return self._nbio.call_later(delay, callback)
  172. def _adapter_remove_timeout(self, timeout_id):
  173. """Implement
  174. :py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
  175. """
  176. timeout_id.cancel()
  177. def _adapter_add_callback_threadsafe(self, callback):
  178. """Implement
  179. :py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
  180. """
  181. if not callable(callback):
  182. raise TypeError(
  183. 'callback must be a callable, but got %r' % (callback,))
  184. self._nbio.add_callback_threadsafe(callback)
  185. def _adapter_connect_stream(self):
  186. """Initiate full-stack connection establishment asynchronously for
  187. internally-initiated connection bring-up.
  188. Upon failed completion, we will invoke
  189. `Connection._on_stream_terminated()`. NOTE: On success,
  190. the stack will be up already, so there is no corresponding callback.
  191. """
  192. self._connection_workflow = connection_workflow.AMQPConnectionWorkflow(
  193. _until_first_amqp_attempt=True)
  194. self._connection_workflow.set_io_services(self._nbio)
  195. def create_connector():
  196. """`AMQPConnector` factory"""
  197. return connection_workflow.AMQPConnector(
  198. lambda _params: _StreamingProtocolShim(self), self._nbio)
  199. self._connection_workflow.start(
  200. [self.params],
  201. connector_factory=create_connector,
  202. native_loop=self._nbio.get_native_ioloop(),
  203. on_done=functools.partial(self._unshim_connection_workflow_callback,
  204. self._on_connection_workflow_done))
  205. @staticmethod
  206. def _unshim_connection_workflow_callback(user_on_done, shim_or_exc):
  207. """
  208. :param callable user_on_done: user's `on_done` callback as defined in
  209. :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
  210. :param _StreamingProtocolShim | Exception shim_or_exc:
  211. """
  212. result = shim_or_exc
  213. if isinstance(result, _StreamingProtocolShim):
  214. result = result.conn
  215. user_on_done(result)
  216. def _abort_connection_workflow(self):
  217. """Asynchronously abort connection workflow. Upon
  218. completion, `Connection._on_stream_terminated()` will be called with None
  219. as the error argument.
  220. Assumption: may be called only while connection is opening.
  221. """
  222. assert not self._opened, (
  223. '_abort_connection_workflow() may be called only when '
  224. 'connection is opening.')
  225. if self._transport is None:
  226. # NOTE: this is possible only when user calls Connection.close() to
  227. # interrupt internally-initiated connection establishment.
  228. # self._connection_workflow.abort() would not call
  229. # Connection.close() before pairing of connection with transport.
  230. assert self._internal_connection_workflow, (
  231. 'Unexpected _abort_connection_workflow() call with '
  232. 'no transport in external connection workflow mode.')
  233. # This will result in call to _on_connection_workflow_done() upon
  234. # completion
  235. self._connection_workflow.abort()
  236. else:
  237. # NOTE: we can't use self._connection_workflow.abort() in this case,
  238. # because it would result in infinite recursion as we're called
  239. # from Connection.close() and _connection_workflow.abort() calls
  240. # Connection.close() to abort a connection that's already been
  241. # paired with a transport. During internally-initiated connection
  242. # establishment, AMQPConnectionWorkflow will discover that user
  243. # aborted the connection when it receives
  244. # pika.exceptions.ConnectionOpenAborted.
  245. # This completes asynchronously, culminating in call to our method
  246. # `connection_lost()`
  247. self._transport.abort()
  248. def _on_connection_workflow_done(self, conn_or_exc):
  249. """`AMQPConnectionWorkflow` completion callback.
  250. :param BaseConnection | Exception conn_or_exc: Our own connection
  251. instance on success; exception on failure. See
  252. `AbstractAMQPConnectionWorkflow.start()` for details.
  253. """
  254. LOGGER.debug('Full-stack connection workflow completed: %r',
  255. conn_or_exc)
  256. self._connection_workflow = None
  257. # Notify protocol of failure
  258. if isinstance(conn_or_exc, Exception):
  259. self._transport = None
  260. if isinstance(conn_or_exc,
  261. connection_workflow.AMQPConnectionWorkflowAborted):
  262. LOGGER.info('Full-stack connection workflow aborted: %r',
  263. conn_or_exc)
  264. # So that _handle_connection_workflow_failure() will know it's
  265. # not a failure
  266. conn_or_exc = None
  267. else:
  268. LOGGER.error('Full-stack connection workflow failed: %r',
  269. conn_or_exc)
  270. if (isinstance(conn_or_exc,
  271. connection_workflow.AMQPConnectionWorkflowFailed)
  272. and isinstance(
  273. conn_or_exc.exceptions[-1], connection_workflow.
  274. AMQPConnectorSocketConnectError)):
  275. conn_or_exc = pika.exceptions.AMQPConnectionError(
  276. conn_or_exc)
  277. self._handle_connection_workflow_failure(conn_or_exc)
  278. else:
  279. # NOTE: On success, the stack will be up already, so there is no
  280. # corresponding callback.
  281. assert conn_or_exc is self, \
  282. 'Expected self conn={!r} from workflow, but got {!r}.'.format(
  283. self, conn_or_exc)
  284. def _handle_connection_workflow_failure(self, error):
  285. """Handle failure of self-initiated stack bring-up and call
  286. `Connection._on_stream_terminated()` if connection is not in closed state
  287. yet. Called by adapter layer when the full-stack connection workflow
  288. fails.
  289. :param Exception | None error: exception instance describing the reason
  290. for failure or None if the connection workflow was aborted.
  291. """
  292. if error is None:
  293. LOGGER.info('Self-initiated stack bring-up aborted.')
  294. else:
  295. LOGGER.error('Self-initiated stack bring-up failed: %r', error)
  296. if not self.is_closed:
  297. self._on_stream_terminated(error)
  298. else:
  299. # This may happen when AMQP layer bring up was started but did not
  300. # complete
  301. LOGGER.debug('_handle_connection_workflow_failure(): '
  302. 'suppressing - connection already closed.')
  303. def _adapter_disconnect_stream(self):
  304. """Asynchronously bring down the streaming transport layer and invoke
  305. `Connection._on_stream_terminated()` asynchronously when complete.
  306. """
  307. if not self._opened:
  308. self._abort_connection_workflow()
  309. else:
  310. # This completes asynchronously, culminating in call to our method
  311. # `connection_lost()`
  312. self._transport.abort()
  313. def _adapter_emit_data(self, data):
  314. """Take ownership of data and send it to AMQP server as soon as
  315. possible.
  316. :param bytes data:
  317. """
  318. self._transport.write(data)
  319. def _proto_connection_made(self, transport):
  320. """Introduces transport to protocol after transport is connected.
  321. :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
  322. :param nbio_interface.AbstractStreamTransport transport:
  323. :raises Exception: Exception-based exception on error
  324. """
  325. self._transport = transport
  326. # Let connection know that stream is available
  327. self._on_stream_connected()
  328. def _proto_connection_lost(self, error):
  329. """Called upon loss or closing of TCP connection.
  330. :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
  331. NOTE: `connection_made()` and `connection_lost()` are each called just
  332. once and in that order. All other callbacks are called between them.
  333. :param BaseException | None error: An exception (check for
  334. `BaseException`) indicates connection failure. None indicates that
  335. connection was closed on this side, such as when it's aborted or
  336. when `AbstractStreamProtocol.eof_received()` returns a falsy result.
  337. :raises Exception: Exception-based exception on error
  338. """
  339. self._transport = None
  340. if error is None:
  341. # Either result of `eof_received()` or abort
  342. if self._got_eof:
  343. error = pika.exceptions.StreamLostError(
  344. 'Transport indicated EOF')
  345. else:
  346. error = pika.exceptions.StreamLostError(
  347. 'Stream connection lost: {!r}'.format(error))
  348. LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
  349. 'connection_lost: %r', error)
  350. self._on_stream_terminated(error)
  351. def _proto_eof_received(self): # pylint: disable=R0201
  352. """Called after the remote peer shuts its write end of the connection.
  353. :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
  354. :returns: A falsy value (including None) will cause the transport to
  355. close itself, resulting in an eventual `connection_lost()` call
  356. from the transport. If a truthy value is returned, it will be the
  357. protocol's responsibility to close/abort the transport.
  358. :rtype: falsy|truthy
  359. :raises Exception: Exception-based exception on error
  360. """
  361. LOGGER.error('Transport indicated EOF.')
  362. self._got_eof = True
  363. # This is how a reset connection will typically present itself
  364. # when we have nothing to send to the server over plaintext stream.
  365. #
  366. # Have transport tear down the connection and invoke our
  367. # `connection_lost` method
  368. return False
  369. def _proto_data_received(self, data):
  370. """Called to deliver incoming data from the server to the protocol.
  371. :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
  372. :param data: Non-empty data bytes.
  373. :raises Exception: Exception-based exception on error
  374. """
  375. self._on_data_available(data)
  376. class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
  377. """Shim for callbacks from transport so that we BaseConnection can
  378. delegate to private methods, thus avoiding contamination of API with
  379. methods that look public, but aren't.
  380. """
  381. # Override AbstractStreamProtocol abstract methods to enable instantiation
  382. connection_made = None
  383. connection_lost = None
  384. eof_received = None
  385. data_received = None
  386. def __init__(self, conn):
  387. """
  388. :param BaseConnection conn:
  389. """
  390. self.conn = conn
  391. # pylint: disable=W0212
  392. self.connection_made = conn._proto_connection_made
  393. self.connection_lost = conn._proto_connection_lost
  394. self.eof_received = conn._proto_eof_received
  395. self.data_received = conn._proto_data_received
  396. def __getattr__(self, attr):
  397. """Proxy inexistent attribute requests to our connection instance
  398. so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
  399. actual connection.
  400. """
  401. return getattr(self.conn, attr)
  402. def __repr__(self):
  403. return '{}: {!r}'.format(self.__class__.__name__, self.conn)