connection_workflow.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866
  1. """Implements `AMQPConnectionWorkflow` - the default workflow of performing
  2. multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
  3. succeeds or all attempts fail.
  4. Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
  5. implementing custom connection workflows.
  6. """
  7. import functools
  8. import logging
  9. import socket
  10. import pika.compat
  11. import pika.exceptions
  12. import pika.tcp_socket_opts
  13. from pika import __version__
  14. _LOG = logging.getLogger(__name__)
  15. class AMQPConnectorException(Exception):
  16. """Base exception for this module"""
  17. class AMQPConnectorStackTimeout(AMQPConnectorException):
  18. """Overall TCP/[SSL]/AMQP stack connection attempt timed out."""
  19. class AMQPConnectorAborted(AMQPConnectorException):
  20. """Asynchronous request was aborted"""
  21. class AMQPConnectorWrongState(AMQPConnectorException):
  22. """AMQPConnector operation requested in wrong state, such as aborting after
  23. completion was reported.
  24. """
  25. class AMQPConnectorPhaseErrorBase(AMQPConnectorException):
  26. """Wrapper for exception that occurred during a particular bring-up phase.
  27. """
  28. def __init__(self, exception, *args):
  29. """
  30. :param BaseException exception: error that occurred while waiting for a
  31. subclass-specific protocol bring-up phase to complete.
  32. :param args: args for parent class
  33. """
  34. super(AMQPConnectorPhaseErrorBase, self).__init__(*args)
  35. self.exception = exception
  36. def __repr__(self):
  37. return '{}: {!r}'.format(self.__class__.__name__, self.exception)
  38. class AMQPConnectorSocketConnectError(AMQPConnectorPhaseErrorBase):
  39. """Error connecting TCP socket to remote peer"""
  40. class AMQPConnectorTransportSetupError(AMQPConnectorPhaseErrorBase):
  41. """Error setting up transport after TCP connected but before AMQP handshake.
  42. """
  43. class AMQPConnectorAMQPHandshakeError(AMQPConnectorPhaseErrorBase):
  44. """Error during AMQP handshake"""
  45. class AMQPConnectionWorkflowAborted(AMQPConnectorException):
  46. """AMQP Connection workflow was aborted."""
  47. class AMQPConnectionWorkflowWrongState(AMQPConnectorException):
  48. """AMQP Connection Workflow operation requested in wrong state, such as
  49. aborting after completion was reported.
  50. """
  51. class AMQPConnectionWorkflowFailed(AMQPConnectorException):
  52. """Indicates that AMQP connection workflow failed.
  53. """
  54. def __init__(self, exceptions, *args):
  55. """
  56. :param sequence exceptions: Exceptions that occurred during the
  57. workflow.
  58. :param args: args to pass to base class
  59. """
  60. super(AMQPConnectionWorkflowFailed, self).__init__(*args)
  61. self.exceptions = tuple(exceptions)
  62. def __repr__(self):
  63. return ('{}: {} exceptions in all; last exception - {!r}; first '
  64. 'exception - {!r}').format(
  65. self.__class__.__name__, len(self.exceptions),
  66. self.exceptions[-1],
  67. self.exceptions[0] if len(self.exceptions) > 1 else None)
  68. class AMQPConnector(object):
  69. """Performs a single TCP/[SSL]/AMQP connection workflow.
  70. """
  71. _STATE_INIT = 0 # start() hasn't been called yet
  72. _STATE_TCP = 1 # TCP/IP connection establishment
  73. _STATE_TRANSPORT = 2 # [SSL] and transport linkup
  74. _STATE_AMQP = 3 # AMQP connection handshake
  75. _STATE_TIMEOUT = 4 # overall TCP/[SSL]/AMQP timeout
  76. _STATE_ABORTING = 5 # abort() called - aborting workflow
  77. _STATE_DONE = 6 # result reported to client
  78. def __init__(self, conn_factory, nbio):
  79. """
  80. :param callable conn_factory: A function that takes
  81. `pika.connection.Parameters` as its only arg and returns a brand new
  82. `pika.connection.Connection`-based adapter instance each time it is
  83. called. The factory must instantiate the connection with
  84. `internal_connection_workflow=False`.
  85. :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
  86. """
  87. self._conn_factory = conn_factory
  88. self._nbio = nbio
  89. self._addr_record = None # type: tuple
  90. self._conn_params = None # type: pika.connection.Parameters
  91. self._on_done = None # will be provided via start()
  92. # TCP connection timeout
  93. # pylint: disable=C0301
  94. self._tcp_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
  95. # Overall TCP/[SSL]/AMQP timeout
  96. self._stack_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
  97. # Current task
  98. self._task_ref = None # type: pika.adapters.utils.nbio_interface.AbstractIOReference
  99. self._sock = None # type: socket.socket
  100. self._amqp_conn = None # type: pika.connection.Connection
  101. self._state = self._STATE_INIT
  102. def start(self, addr_record, conn_params, on_done):
  103. """Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.
  104. :param tuple addr_record: a single resolved address record compatible
  105. with `socket.getaddrinfo()` format.
  106. :param pika.connection.Parameters conn_params:
  107. :param callable on_done: Function to call upon completion of the
  108. workflow: `on_done(pika.connection.Connection | BaseException)`. If
  109. exception, it's going to be one of the following:
  110. `AMQPConnectorSocketConnectError`
  111. `AMQPConnectorTransportSetupError`
  112. `AMQPConnectorAMQPHandshakeError`
  113. `AMQPConnectorAborted`
  114. """
  115. if self._state != self._STATE_INIT:
  116. raise AMQPConnectorWrongState(
  117. 'Already in progress or finished; state={}'.format(self._state))
  118. self._addr_record = addr_record
  119. self._conn_params = conn_params
  120. self._on_done = on_done
  121. # Create socket and initiate TCP/IP connection
  122. self._state = self._STATE_TCP
  123. self._sock = socket.socket(*self._addr_record[:3])
  124. self._sock.setsockopt(pika.compat.SOL_TCP, socket.TCP_NODELAY, 1)
  125. pika.tcp_socket_opts.set_sock_opts(self._conn_params.tcp_options,
  126. self._sock)
  127. self._sock.setblocking(False)
  128. addr = self._addr_record[4]
  129. _LOG.info('Pika version %s connecting to %r', __version__, addr)
  130. self._task_ref = self._nbio.connect_socket(
  131. self._sock, addr, on_done=self._on_tcp_connection_done)
  132. # Start socket connection timeout timer
  133. self._tcp_timeout_ref = None
  134. if self._conn_params.socket_timeout is not None:
  135. self._tcp_timeout_ref = self._nbio.call_later(
  136. self._conn_params.socket_timeout,
  137. self._on_tcp_connection_timeout)
  138. # Start overall TCP/[SSL]/AMQP stack connection timeout timer
  139. self._stack_timeout_ref = None
  140. if self._conn_params.stack_timeout is not None:
  141. self._stack_timeout_ref = self._nbio.call_later(
  142. self._conn_params.stack_timeout, self._on_overall_timeout)
  143. def abort(self):
  144. """Abort the workflow asynchronously. The completion callback will be
  145. called with an instance of AMQPConnectorAborted.
  146. NOTE: we can't cancel/close synchronously because aborting pika
  147. Connection and its transport requires an asynchronous operation.
  148. :raises AMQPConnectorWrongState: If called after completion has been
  149. reported or the workflow not started yet.
  150. """
  151. if self._state == self._STATE_INIT:
  152. raise AMQPConnectorWrongState('Cannot abort before starting.')
  153. elif self._state == self._STATE_DONE:
  154. raise AMQPConnectorWrongState(
  155. 'Cannot abort after completion was reported')
  156. self._state = self._STATE_ABORTING
  157. self._deactivate()
  158. _LOG.info(
  159. 'AMQPConnector: beginning client-initiated asynchronous '
  160. 'abort; %r/%s', self._conn_params.host, self._addr_record)
  161. if self._amqp_conn is None:
  162. _LOG.debug('AMQPConnector.abort(): no connection, so just '
  163. 'scheduling completion report via I/O loop.')
  164. self._nbio.add_callback_threadsafe(
  165. functools.partial(self._report_completion_and_cleanup,
  166. AMQPConnectorAborted()))
  167. else:
  168. if not self._amqp_conn.is_closing:
  169. # Initiate close of AMQP connection and wait for asynchronous
  170. # callback from the Connection instance before reporting
  171. # completion to client
  172. _LOG.debug('AMQPConnector.abort(): closing Connection.')
  173. self._amqp_conn.close(
  174. 320, 'Client-initiated abort of AMQP Connection Workflow.')
  175. else:
  176. # It's already closing, must be due to our timeout processing,
  177. # so we'll just piggy back on the callback it registered
  178. _LOG.debug('AMQPConnector.abort(): closing of Connection was '
  179. 'already initiated.')
  180. assert self._state == self._STATE_TIMEOUT, \
  181. ('Connection is closing, but not in TIMEOUT state; state={}'
  182. .format(self._state))
  183. def _close(self):
  184. """Cancel asynchronous tasks and clean up to assist garbage collection.
  185. Transition to STATE_DONE.
  186. """
  187. self._deactivate()
  188. if self._sock is not None:
  189. self._sock.close()
  190. self._sock = None
  191. self._conn_factory = None
  192. self._nbio = None
  193. self._addr_record = None
  194. self._on_done = None
  195. self._state = self._STATE_DONE
  196. def _deactivate(self):
  197. """Cancel asynchronous tasks.
  198. """
  199. # NOTE: self._amqp_conn requires special handling as it doesn't support
  200. # synchronous closing. We special-case it elsewhere in the code where
  201. # needed.
  202. assert self._amqp_conn is None, \
  203. '_deactivate called with self._amqp_conn not None; state={}'.format(
  204. self._state)
  205. if self._tcp_timeout_ref is not None:
  206. self._tcp_timeout_ref.cancel()
  207. self._tcp_timeout_ref = None
  208. if self._stack_timeout_ref is not None:
  209. self._stack_timeout_ref.cancel()
  210. self._stack_timeout_ref = None
  211. if self._task_ref is not None:
  212. self._task_ref.cancel()
  213. self._task_ref = None
  214. def _report_completion_and_cleanup(self, result):
  215. """Clean up and invoke client's `on_done` callback.
  216. :param pika.connection.Connection | BaseException result: value to pass
  217. to user's `on_done` callback.
  218. """
  219. if isinstance(result, BaseException):
  220. _LOG.error('AMQPConnector - reporting failure: %r', result)
  221. else:
  222. _LOG.info('AMQPConnector - reporting success: %r', result)
  223. on_done = self._on_done
  224. self._close()
  225. on_done(result)
  226. def _on_tcp_connection_timeout(self):
  227. """Handle TCP connection timeout
  228. Reports AMQPConnectorSocketConnectError with socket.timeout inside.
  229. """
  230. self._tcp_timeout_ref = None
  231. error = AMQPConnectorSocketConnectError(
  232. socket.timeout('TCP connection attempt timed out: {!r}/{}'.format(
  233. self._conn_params.host, self._addr_record)))
  234. self._report_completion_and_cleanup(error)
  235. def _on_overall_timeout(self):
  236. """Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
  237. `Timeout` error to the client.
  238. Reports AMQPConnectorSocketConnectError if timeout occurred during
  239. socket TCP connection attempt.
  240. Reports AMQPConnectorTransportSetupError if timeout occurred during
  241. tramsport [SSL] setup attempt.
  242. Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
  243. AMQP handshake.
  244. """
  245. self._stack_timeout_ref = None
  246. prev_state = self._state
  247. self._state = self._STATE_TIMEOUT
  248. if prev_state == self._STATE_AMQP:
  249. msg = ('Timeout while setting up AMQP to {!r}/{}; ssl={}'.format(
  250. self._conn_params.host, self._addr_record,
  251. bool(self._conn_params.ssl_options)))
  252. _LOG.error(msg)
  253. # Initiate close of AMQP connection and wait for asynchronous
  254. # callback from the Connection instance before reporting completion
  255. # to client
  256. assert not self._amqp_conn.is_open, \
  257. 'Unexpected open state of {!r}'.format(self._amqp_conn)
  258. if not self._amqp_conn.is_closing:
  259. self._amqp_conn.close(320, msg)
  260. return
  261. if prev_state == self._STATE_TCP:
  262. error = AMQPConnectorSocketConnectError(
  263. AMQPConnectorStackTimeout(
  264. 'Timeout while connecting socket to {!r}/{}'.format(
  265. self._conn_params.host, self._addr_record)))
  266. else:
  267. assert prev_state == self._STATE_TRANSPORT
  268. error = AMQPConnectorTransportSetupError(
  269. AMQPConnectorStackTimeout(
  270. 'Timeout while setting up transport to {!r}/{}; ssl={}'.
  271. format(self._conn_params.host, self._addr_record,
  272. bool(self._conn_params.ssl_options))))
  273. self._report_completion_and_cleanup(error)
  274. def _on_tcp_connection_done(self, exc):
  275. """Handle completion of asynchronous socket connection attempt.
  276. Reports AMQPConnectorSocketConnectError if TCP socket connection
  277. failed.
  278. :param None|BaseException exc: None on success; exception object on
  279. failure
  280. """
  281. self._task_ref = None
  282. if self._tcp_timeout_ref is not None:
  283. self._tcp_timeout_ref.cancel()
  284. self._tcp_timeout_ref = None
  285. if exc is not None:
  286. _LOG.error('TCP Connection attempt failed: %r; dest=%r', exc,
  287. self._addr_record)
  288. self._report_completion_and_cleanup(
  289. AMQPConnectorSocketConnectError(exc))
  290. return
  291. # We succeeded in making a TCP/IP connection to the server
  292. _LOG.debug('TCP connection to broker established: %r.', self._sock)
  293. # Now set up the transport
  294. self._state = self._STATE_TRANSPORT
  295. ssl_context = server_hostname = None
  296. if self._conn_params.ssl_options is not None:
  297. ssl_context = self._conn_params.ssl_options.context
  298. server_hostname = self._conn_params.ssl_options.server_hostname
  299. if server_hostname is None:
  300. server_hostname = self._conn_params.host
  301. self._task_ref = self._nbio.create_streaming_connection(
  302. protocol_factory=functools.partial(self._conn_factory,
  303. self._conn_params),
  304. sock=self._sock,
  305. ssl_context=ssl_context,
  306. server_hostname=server_hostname,
  307. on_done=self._on_transport_establishment_done)
  308. self._sock = None # create_streaming_connection() takes ownership
  309. def _on_transport_establishment_done(self, result):
  310. """Handle asynchronous completion of
  311. `AbstractIOServices.create_streaming_connection()`
  312. Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
  313. failed.
  314. :param sequence|BaseException result: On success, a two-tuple
  315. (transport, protocol); on failure, exception instance.
  316. """
  317. self._task_ref = None
  318. if isinstance(result, BaseException):
  319. _LOG.error(
  320. 'Attempt to create the streaming transport failed: %r; '
  321. '%r/%s; ssl=%s', result, self._conn_params.host,
  322. self._addr_record, bool(self._conn_params.ssl_options))
  323. self._report_completion_and_cleanup(
  324. AMQPConnectorTransportSetupError(result))
  325. return
  326. # We succeeded in setting up the streaming transport!
  327. # result is a two-tuple (transport, protocol)
  328. _LOG.info('Streaming transport linked up: %r.', result)
  329. _transport, self._amqp_conn = result
  330. # AMQP handshake is in progress - initiated during transport link-up
  331. self._state = self._STATE_AMQP
  332. # We explicitly remove default handler because it raises an exception.
  333. self._amqp_conn.add_on_open_error_callback(
  334. self._on_amqp_handshake_done, remove_default=True)
  335. self._amqp_conn.add_on_open_callback(self._on_amqp_handshake_done)
  336. def _on_amqp_handshake_done(self, connection, error=None):
  337. """Handle completion of AMQP connection handshake attempt.
  338. NOTE: we handle two types of callbacks - success with just connection
  339. arg as well as the open-error callback with connection and error
  340. Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.
  341. :param pika.connection.Connection connection:
  342. :param BaseException | None error: None on success, otherwise
  343. failure
  344. """
  345. _LOG.debug(
  346. 'AMQPConnector: AMQP handshake attempt completed; state=%s; '
  347. 'error=%r; %r/%s', self._state, error, self._conn_params.host,
  348. self._addr_record)
  349. # Don't need it any more; and _deactivate() checks that it's None
  350. self._amqp_conn = None
  351. if self._state == self._STATE_ABORTING:
  352. # Client-initiated abort takes precedence over timeout
  353. result = AMQPConnectorAborted()
  354. elif self._state == self._STATE_TIMEOUT:
  355. result = AMQPConnectorAMQPHandshakeError(
  356. AMQPConnectorStackTimeout(
  357. 'Timeout during AMQP handshake{!r}/{}; ssl={}'.format(
  358. self._conn_params.host, self._addr_record,
  359. bool(self._conn_params.ssl_options))))
  360. elif self._state == self._STATE_AMQP:
  361. if error is None:
  362. _LOG.debug(
  363. 'AMQPConnector: AMQP connection established for %r/%s: %r',
  364. self._conn_params.host, self._addr_record, connection)
  365. result = connection
  366. else:
  367. _LOG.debug(
  368. 'AMQPConnector: AMQP connection handshake failed for '
  369. '%r/%s: %r', self._conn_params.host, self._addr_record,
  370. error)
  371. result = AMQPConnectorAMQPHandshakeError(error)
  372. else:
  373. # We timed out or aborted and initiated closing of the connection,
  374. # but this callback snuck in
  375. _LOG.debug(
  376. 'AMQPConnector: Ignoring AMQP handshake completion '
  377. 'notification due to wrong state=%s; error=%r; conn=%r',
  378. self._state, error, connection)
  379. return
  380. self._report_completion_and_cleanup(result)
  381. class AbstractAMQPConnectionWorkflow(pika.compat.AbstractBase):
  382. """Interface for implementing a custom TCP/[SSL]/AMQP connection workflow.
  383. """
  384. def start(self, connection_configs, connector_factory, native_loop,
  385. on_done):
  386. """Asynchronously perform the workflow until success or all retries
  387. are exhausted. Called by the adapter.
  388. :param sequence connection_configs: A sequence of one or more
  389. `pika.connection.Parameters`-based objects. Will attempt to connect
  390. using each config in the given order.
  391. :param callable connector_factory: call it without args to obtain a new
  392. instance of `AMQPConnector` for each connection attempt.
  393. See `AMQPConnector` for details.
  394. :param native_loop: Native I/O loop passed by app to the adapter or
  395. obtained by the adapter by default.
  396. :param callable on_done: Function to call upon completion of the
  397. workflow:
  398. `on_done(pika.connection.Connection |
  399. AMQPConnectionWorkflowFailed |
  400. AMQPConnectionWorkflowAborted)`.
  401. `Connection`-based adapter on success,
  402. `AMQPConnectionWorkflowFailed` on failure,
  403. `AMQPConnectionWorkflowAborted` if workflow was aborted.
  404. :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
  405. as after starting the workflow.
  406. """
  407. raise NotImplementedError
  408. def abort(self):
  409. """Abort the workflow asynchronously. The completion callback will be
  410. called with an instance of AMQPConnectionWorkflowAborted.
  411. NOTE: we can't cancel/close synchronously because aborting pika
  412. Connection and its transport requires an asynchronous operation.
  413. :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
  414. as before starting or after completion has been reported.
  415. """
  416. raise NotImplementedError
  417. class AMQPConnectionWorkflow(AbstractAMQPConnectionWorkflow):
  418. """Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
  419. connection attempts with timeouts and retries until one succeeds or all
  420. attempts fail.
  421. The workflow:
  422. while not success and retries remain:
  423. 1. For each given config (pika.connection.Parameters object):
  424. A. Perform DNS resolution of the config's host.
  425. B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
  426. until one succeeds, in which case we're done.
  427. 2. If all configs failed but retries remain, resume from beginning
  428. after the given retry pause. NOTE: failure of DNS resolution
  429. is equivalent to one cycle and will be retried after the pause
  430. if retries remain.
  431. """
  432. _SOCK_TYPE = socket.SOCK_STREAM
  433. _IPPROTO = socket.IPPROTO_TCP
  434. _STATE_INIT = 0
  435. _STATE_ACTIVE = 1
  436. _STATE_ABORTING = 2
  437. _STATE_DONE = 3
  438. def __init__(self, _until_first_amqp_attempt=False):
  439. """
  440. :param int | float retry_pause: Non-negative number of seconds to wait
  441. before retrying the config sequence. Meaningful only if retries is
  442. greater than 0. Defaults to 2 seconds.
  443. :param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
  444. after first AMQP handshake attempt, regardless of outcome (success
  445. or failure). The automatic connection logic in
  446. `pika.connection.Connection` enables this because it's not
  447. designed/tested to reset all state properly to handle more than one
  448. AMQP handshake attempt.
  449. TODO: Do we need getaddrinfo timeout?
  450. TODO: Would it be useful to implement exponential back-off?
  451. """
  452. self._attempts_remaining = None # supplied by start()
  453. self._retry_pause = None # supplied by start()
  454. self._until_first_amqp_attempt = _until_first_amqp_attempt
  455. # Provided by set_io_services()
  456. # pylint: disable=C0301
  457. self._nbio = None # type: pika.adapters.utils.nbio_interface.AbstractIOServices
  458. # Current index within `_connection_configs`; initialized when
  459. # starting a new connection sequence.
  460. self._current_config_index = None
  461. self._connection_configs = None # supplied by start()
  462. self._connector_factory = None # supplied by start()
  463. self._on_done = None # supplied by start()
  464. self._connector = None # type: AMQPConnector
  465. self._task_ref = None # current cancelable asynchronous task or timer
  466. self._addrinfo_iter = None
  467. # Exceptions from all failed connection attempts in this workflow
  468. self._connection_errors = []
  469. self._state = self._STATE_INIT
  470. def set_io_services(self, nbio):
  471. """Called by the conneciton adapter only on pika's
  472. `AMQPConnectionWorkflow` instance to provide it the adapter-specific
  473. `AbstractIOServices` object before calling the `start()` method.
  474. NOTE: Custom workflow implementations should use the native I/O loop
  475. directly because `AbstractIOServices` is private to Pika
  476. implementation and its interface may change without notice.
  477. :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
  478. """
  479. self._nbio = nbio
  480. def start(
  481. self,
  482. connection_configs,
  483. connector_factory,
  484. native_loop, # pylint: disable=W0613
  485. on_done):
  486. """Override `AbstractAMQPConnectionWorkflow.start()`.
  487. NOTE: This implementation uses `connection_attempts` and `retry_delay`
  488. values from the last element of the given `connection_configs` sequence
  489. as the overall number of connection attempts of the entire
  490. `connection_configs` sequence and pause between each sequence.
  491. """
  492. if self._state != self._STATE_INIT:
  493. raise AMQPConnectorWrongState(
  494. 'Already in progress or finished; state={}'.format(self._state))
  495. try:
  496. iter(connection_configs)
  497. except Exception as error:
  498. raise TypeError(
  499. 'connection_configs does not support iteration: {!r}'.format(
  500. error))
  501. if not connection_configs:
  502. raise ValueError(
  503. 'connection_configs is empty: {!r}.'.format(connection_configs))
  504. self._connection_configs = connection_configs
  505. self._connector_factory = connector_factory
  506. self._on_done = on_done
  507. self._attempts_remaining = connection_configs[-1].connection_attempts
  508. self._retry_pause = connection_configs[-1].retry_delay
  509. self._state = self._STATE_ACTIVE
  510. _LOG.debug('Starting AMQP Connection workflow asynchronously.')
  511. # Begin from our own I/O loop context to avoid calling back into client
  512. # from client's call here
  513. self._task_ref = self._nbio.call_later(
  514. 0, functools.partial(self._start_new_cycle_async, first=True))
  515. def abort(self):
  516. """Override `AbstractAMQPConnectionWorkflow.abort()`.
  517. """
  518. if self._state == self._STATE_INIT:
  519. raise AMQPConnectorWrongState('Cannot abort before starting.')
  520. elif self._state == self._STATE_DONE:
  521. raise AMQPConnectorWrongState(
  522. 'Cannot abort after completion was reported')
  523. self._state = self._STATE_ABORTING
  524. self._deactivate()
  525. _LOG.info('AMQPConnectionWorkflow: beginning client-initiated '
  526. 'asynchronous abort.')
  527. if self._connector is None:
  528. _LOG.debug('AMQPConnectionWorkflow.abort(): no connector, so just '
  529. 'scheduling completion report via I/O loop.')
  530. self._nbio.add_callback_threadsafe(
  531. functools.partial(self._report_completion_and_cleanup,
  532. AMQPConnectionWorkflowAborted()))
  533. else:
  534. _LOG.debug('AMQPConnectionWorkflow.abort(): requesting '
  535. 'connector.abort().')
  536. self._connector.abort()
  537. def _close(self):
  538. """Cancel asynchronous tasks and clean up to assist garbage collection.
  539. Transition to _STATE_DONE.
  540. """
  541. self._deactivate()
  542. self._connection_configs = None
  543. self._nbio = None
  544. self._connector_factory = None
  545. self._on_done = None
  546. self._connector = None
  547. self._addrinfo_iter = None
  548. self._connection_errors = None
  549. self._state = self._STATE_DONE
  550. def _deactivate(self):
  551. """Cancel asynchronous tasks.
  552. """
  553. if self._task_ref is not None:
  554. self._task_ref.cancel()
  555. self._task_ref = None
  556. def _report_completion_and_cleanup(self, result):
  557. """Clean up and invoke client's `on_done` callback.
  558. :param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
  559. value to pass to user's `on_done` callback.
  560. """
  561. if isinstance(result, BaseException):
  562. _LOG.error('AMQPConnectionWorkflow - reporting failure: %r', result)
  563. else:
  564. _LOG.info('AMQPConnectionWorkflow - reporting success: %r', result)
  565. on_done = self._on_done
  566. self._close()
  567. on_done(result)
  568. def _start_new_cycle_async(self, first):
  569. """Start a new workflow cycle (if any more attempts are left) beginning
  570. with the first Parameters object in self._connection_configs. If out of
  571. attempts, report `AMQPConnectionWorkflowFailed`.
  572. :param bool first: if True, don't delay; otherwise delay next attempt by
  573. `self._retry_pause` seconds.
  574. """
  575. self._task_ref = None
  576. assert self._attempts_remaining >= 0, self._attempts_remaining
  577. if self._attempts_remaining <= 0:
  578. error = AMQPConnectionWorkflowFailed(self._connection_errors)
  579. _LOG.error('AMQP connection workflow failed: %r.', error)
  580. self._report_completion_and_cleanup(error)
  581. return
  582. self._attempts_remaining -= 1
  583. _LOG.debug(
  584. 'Beginning a new AMQP connection workflow cycle; attempts '
  585. 'remaining after this: %s', self._attempts_remaining)
  586. self._current_config_index = None
  587. self._task_ref = self._nbio.call_later(
  588. 0 if first else self._retry_pause, self._try_next_config_async)
  589. def _try_next_config_async(self):
  590. """Attempt to connect using the next Parameters config. If there are no
  591. more configs, start a new cycle.
  592. """
  593. self._task_ref = None
  594. if self._current_config_index is None:
  595. self._current_config_index = 0
  596. else:
  597. self._current_config_index += 1
  598. if self._current_config_index >= len(self._connection_configs):
  599. _LOG.debug('_try_next_config_async: starting a new cycle.')
  600. self._start_new_cycle_async(first=False)
  601. return
  602. params = self._connection_configs[self._current_config_index]
  603. _LOG.debug('_try_next_config_async: %r:%s', params.host, params.port)
  604. # Begin with host address resolution
  605. assert self._task_ref is None
  606. self._task_ref = self._nbio.getaddrinfo(
  607. host=params.host,
  608. port=params.port,
  609. socktype=self._SOCK_TYPE,
  610. proto=self._IPPROTO,
  611. on_done=self._on_getaddrinfo_async_done)
  612. def _on_getaddrinfo_async_done(self, addrinfos_or_exc):
  613. """Handles completion callback from asynchronous `getaddrinfo()`.
  614. :param list | BaseException addrinfos_or_exc: resolved address records
  615. returned by `getaddrinfo()` or an exception object from failure.
  616. """
  617. self._task_ref = None
  618. if isinstance(addrinfos_or_exc, BaseException):
  619. _LOG.error('getaddrinfo failed: %r.', addrinfos_or_exc)
  620. self._connection_errors.append(addrinfos_or_exc)
  621. self._start_new_cycle_async(first=False)
  622. return
  623. _LOG.debug('getaddrinfo returned %s records', len(addrinfos_or_exc))
  624. self._addrinfo_iter = iter(addrinfos_or_exc)
  625. self._try_next_resolved_address()
  626. def _try_next_resolved_address(self):
  627. """Try connecting using next resolved address. If there aren't any left,
  628. continue with next Parameters config.
  629. """
  630. try:
  631. addr_record = next(self._addrinfo_iter)
  632. except StopIteration:
  633. _LOG.debug(
  634. '_try_next_resolved_address: continuing with next config.')
  635. self._try_next_config_async()
  636. return
  637. _LOG.debug('Attempting to connect using address record %r', addr_record)
  638. self._connector = self._connector_factory() # type: AMQPConnector
  639. self._connector.start(
  640. addr_record=addr_record,
  641. conn_params=self._connection_configs[self._current_config_index],
  642. on_done=self._on_connector_done)
  643. def _on_connector_done(self, conn_or_exc):
  644. """Handle completion of connection attempt by `AMQPConnector`.
  645. :param pika.connection.Connection | BaseException conn_or_exc: See
  646. `AMQPConnector.start()` for exception details.
  647. """
  648. self._connector = None
  649. _LOG.debug('Connection attempt completed with %r', conn_or_exc)
  650. if isinstance(conn_or_exc, BaseException):
  651. self._connection_errors.append(conn_or_exc)
  652. if isinstance(conn_or_exc, AMQPConnectorAborted):
  653. assert self._state == self._STATE_ABORTING, \
  654. 'Expected _STATE_ABORTING, but got {!r}'.format(self._state)
  655. self._report_completion_and_cleanup(
  656. AMQPConnectionWorkflowAborted())
  657. elif (self._until_first_amqp_attempt and
  658. isinstance(conn_or_exc, AMQPConnectorAMQPHandshakeError)):
  659. _LOG.debug('Ending AMQP connection workflow after first failed '
  660. 'AMQP handshake due to _until_first_amqp_attempt.')
  661. if isinstance(conn_or_exc.exception,
  662. pika.exceptions.ConnectionOpenAborted):
  663. error = AMQPConnectionWorkflowAborted
  664. else:
  665. error = AMQPConnectionWorkflowFailed(
  666. self._connection_errors)
  667. self._report_completion_and_cleanup(error)
  668. else:
  669. self._try_next_resolved_address()
  670. else:
  671. # Success!
  672. self._report_completion_and_cleanup(conn_or_exc)