123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866 |
- """Implements `AMQPConnectionWorkflow` - the default workflow of performing
- multiple TCP/[SSL]/AMQP connection attempts with timeouts and retries until one
- succeeds or all attempts fail.
- Defines the interface `AbstractAMQPConnectionWorkflow` that facilitates
- implementing custom connection workflows.
- """
- import functools
- import logging
- import socket
- import pika.compat
- import pika.exceptions
- import pika.tcp_socket_opts
- from pika import __version__
- _LOG = logging.getLogger(__name__)
- class AMQPConnectorException(Exception):
- """Base exception for this module"""
- class AMQPConnectorStackTimeout(AMQPConnectorException):
- """Overall TCP/[SSL]/AMQP stack connection attempt timed out."""
- class AMQPConnectorAborted(AMQPConnectorException):
- """Asynchronous request was aborted"""
- class AMQPConnectorWrongState(AMQPConnectorException):
- """AMQPConnector operation requested in wrong state, such as aborting after
- completion was reported.
- """
- class AMQPConnectorPhaseErrorBase(AMQPConnectorException):
- """Wrapper for exception that occurred during a particular bring-up phase.
- """
- def __init__(self, exception, *args):
- """
- :param BaseException exception: error that occurred while waiting for a
- subclass-specific protocol bring-up phase to complete.
- :param args: args for parent class
- """
- super(AMQPConnectorPhaseErrorBase, self).__init__(*args)
- self.exception = exception
- def __repr__(self):
- return '{}: {!r}'.format(self.__class__.__name__, self.exception)
- class AMQPConnectorSocketConnectError(AMQPConnectorPhaseErrorBase):
- """Error connecting TCP socket to remote peer"""
- class AMQPConnectorTransportSetupError(AMQPConnectorPhaseErrorBase):
- """Error setting up transport after TCP connected but before AMQP handshake.
- """
- class AMQPConnectorAMQPHandshakeError(AMQPConnectorPhaseErrorBase):
- """Error during AMQP handshake"""
- class AMQPConnectionWorkflowAborted(AMQPConnectorException):
- """AMQP Connection workflow was aborted."""
- class AMQPConnectionWorkflowWrongState(AMQPConnectorException):
- """AMQP Connection Workflow operation requested in wrong state, such as
- aborting after completion was reported.
- """
- class AMQPConnectionWorkflowFailed(AMQPConnectorException):
- """Indicates that AMQP connection workflow failed.
- """
- def __init__(self, exceptions, *args):
- """
- :param sequence exceptions: Exceptions that occurred during the
- workflow.
- :param args: args to pass to base class
- """
- super(AMQPConnectionWorkflowFailed, self).__init__(*args)
- self.exceptions = tuple(exceptions)
- def __repr__(self):
- return ('{}: {} exceptions in all; last exception - {!r}; first '
- 'exception - {!r}').format(
- self.__class__.__name__, len(self.exceptions),
- self.exceptions[-1],
- self.exceptions[0] if len(self.exceptions) > 1 else None)
- class AMQPConnector(object):
- """Performs a single TCP/[SSL]/AMQP connection workflow.
- """
- _STATE_INIT = 0 # start() hasn't been called yet
- _STATE_TCP = 1 # TCP/IP connection establishment
- _STATE_TRANSPORT = 2 # [SSL] and transport linkup
- _STATE_AMQP = 3 # AMQP connection handshake
- _STATE_TIMEOUT = 4 # overall TCP/[SSL]/AMQP timeout
- _STATE_ABORTING = 5 # abort() called - aborting workflow
- _STATE_DONE = 6 # result reported to client
- def __init__(self, conn_factory, nbio):
- """
- :param callable conn_factory: A function that takes
- `pika.connection.Parameters` as its only arg and returns a brand new
- `pika.connection.Connection`-based adapter instance each time it is
- called. The factory must instantiate the connection with
- `internal_connection_workflow=False`.
- :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
- """
- self._conn_factory = conn_factory
- self._nbio = nbio
- self._addr_record = None # type: tuple
- self._conn_params = None # type: pika.connection.Parameters
- self._on_done = None # will be provided via start()
- # TCP connection timeout
- # pylint: disable=C0301
- self._tcp_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
- # Overall TCP/[SSL]/AMQP timeout
- self._stack_timeout_ref = None # type: pika.adapters.utils.nbio_interface.AbstractTimerReference
- # Current task
- self._task_ref = None # type: pika.adapters.utils.nbio_interface.AbstractIOReference
- self._sock = None # type: socket.socket
- self._amqp_conn = None # type: pika.connection.Connection
- self._state = self._STATE_INIT
- def start(self, addr_record, conn_params, on_done):
- """Asynchronously perform a single TCP/[SSL]/AMQP connection attempt.
- :param tuple addr_record: a single resolved address record compatible
- with `socket.getaddrinfo()` format.
- :param pika.connection.Parameters conn_params:
- :param callable on_done: Function to call upon completion of the
- workflow: `on_done(pika.connection.Connection | BaseException)`. If
- exception, it's going to be one of the following:
- `AMQPConnectorSocketConnectError`
- `AMQPConnectorTransportSetupError`
- `AMQPConnectorAMQPHandshakeError`
- `AMQPConnectorAborted`
- """
- if self._state != self._STATE_INIT:
- raise AMQPConnectorWrongState(
- 'Already in progress or finished; state={}'.format(self._state))
- self._addr_record = addr_record
- self._conn_params = conn_params
- self._on_done = on_done
- # Create socket and initiate TCP/IP connection
- self._state = self._STATE_TCP
- self._sock = socket.socket(*self._addr_record[:3])
- self._sock.setsockopt(pika.compat.SOL_TCP, socket.TCP_NODELAY, 1)
- pika.tcp_socket_opts.set_sock_opts(self._conn_params.tcp_options,
- self._sock)
- self._sock.setblocking(False)
- addr = self._addr_record[4]
- _LOG.info('Pika version %s connecting to %r', __version__, addr)
- self._task_ref = self._nbio.connect_socket(
- self._sock, addr, on_done=self._on_tcp_connection_done)
- # Start socket connection timeout timer
- self._tcp_timeout_ref = None
- if self._conn_params.socket_timeout is not None:
- self._tcp_timeout_ref = self._nbio.call_later(
- self._conn_params.socket_timeout,
- self._on_tcp_connection_timeout)
- # Start overall TCP/[SSL]/AMQP stack connection timeout timer
- self._stack_timeout_ref = None
- if self._conn_params.stack_timeout is not None:
- self._stack_timeout_ref = self._nbio.call_later(
- self._conn_params.stack_timeout, self._on_overall_timeout)
- def abort(self):
- """Abort the workflow asynchronously. The completion callback will be
- called with an instance of AMQPConnectorAborted.
- NOTE: we can't cancel/close synchronously because aborting pika
- Connection and its transport requires an asynchronous operation.
- :raises AMQPConnectorWrongState: If called after completion has been
- reported or the workflow not started yet.
- """
- if self._state == self._STATE_INIT:
- raise AMQPConnectorWrongState('Cannot abort before starting.')
- elif self._state == self._STATE_DONE:
- raise AMQPConnectorWrongState(
- 'Cannot abort after completion was reported')
- self._state = self._STATE_ABORTING
- self._deactivate()
- _LOG.info(
- 'AMQPConnector: beginning client-initiated asynchronous '
- 'abort; %r/%s', self._conn_params.host, self._addr_record)
- if self._amqp_conn is None:
- _LOG.debug('AMQPConnector.abort(): no connection, so just '
- 'scheduling completion report via I/O loop.')
- self._nbio.add_callback_threadsafe(
- functools.partial(self._report_completion_and_cleanup,
- AMQPConnectorAborted()))
- else:
- if not self._amqp_conn.is_closing:
- # Initiate close of AMQP connection and wait for asynchronous
- # callback from the Connection instance before reporting
- # completion to client
- _LOG.debug('AMQPConnector.abort(): closing Connection.')
- self._amqp_conn.close(
- 320, 'Client-initiated abort of AMQP Connection Workflow.')
- else:
- # It's already closing, must be due to our timeout processing,
- # so we'll just piggy back on the callback it registered
- _LOG.debug('AMQPConnector.abort(): closing of Connection was '
- 'already initiated.')
- assert self._state == self._STATE_TIMEOUT, \
- ('Connection is closing, but not in TIMEOUT state; state={}'
- .format(self._state))
- def _close(self):
- """Cancel asynchronous tasks and clean up to assist garbage collection.
- Transition to STATE_DONE.
- """
- self._deactivate()
- if self._sock is not None:
- self._sock.close()
- self._sock = None
- self._conn_factory = None
- self._nbio = None
- self._addr_record = None
- self._on_done = None
- self._state = self._STATE_DONE
- def _deactivate(self):
- """Cancel asynchronous tasks.
- """
- # NOTE: self._amqp_conn requires special handling as it doesn't support
- # synchronous closing. We special-case it elsewhere in the code where
- # needed.
- assert self._amqp_conn is None, \
- '_deactivate called with self._amqp_conn not None; state={}'.format(
- self._state)
- if self._tcp_timeout_ref is not None:
- self._tcp_timeout_ref.cancel()
- self._tcp_timeout_ref = None
- if self._stack_timeout_ref is not None:
- self._stack_timeout_ref.cancel()
- self._stack_timeout_ref = None
- if self._task_ref is not None:
- self._task_ref.cancel()
- self._task_ref = None
- def _report_completion_and_cleanup(self, result):
- """Clean up and invoke client's `on_done` callback.
- :param pika.connection.Connection | BaseException result: value to pass
- to user's `on_done` callback.
- """
- if isinstance(result, BaseException):
- _LOG.error('AMQPConnector - reporting failure: %r', result)
- else:
- _LOG.info('AMQPConnector - reporting success: %r', result)
- on_done = self._on_done
- self._close()
- on_done(result)
- def _on_tcp_connection_timeout(self):
- """Handle TCP connection timeout
- Reports AMQPConnectorSocketConnectError with socket.timeout inside.
- """
- self._tcp_timeout_ref = None
- error = AMQPConnectorSocketConnectError(
- socket.timeout('TCP connection attempt timed out: {!r}/{}'.format(
- self._conn_params.host, self._addr_record)))
- self._report_completion_and_cleanup(error)
- def _on_overall_timeout(self):
- """Handle overall TCP/[SSL]/AMQP connection attempt timeout by reporting
- `Timeout` error to the client.
- Reports AMQPConnectorSocketConnectError if timeout occurred during
- socket TCP connection attempt.
- Reports AMQPConnectorTransportSetupError if timeout occurred during
- tramsport [SSL] setup attempt.
- Reports AMQPConnectorAMQPHandshakeError if timeout occurred during
- AMQP handshake.
- """
- self._stack_timeout_ref = None
- prev_state = self._state
- self._state = self._STATE_TIMEOUT
- if prev_state == self._STATE_AMQP:
- msg = ('Timeout while setting up AMQP to {!r}/{}; ssl={}'.format(
- self._conn_params.host, self._addr_record,
- bool(self._conn_params.ssl_options)))
- _LOG.error(msg)
- # Initiate close of AMQP connection and wait for asynchronous
- # callback from the Connection instance before reporting completion
- # to client
- assert not self._amqp_conn.is_open, \
- 'Unexpected open state of {!r}'.format(self._amqp_conn)
- if not self._amqp_conn.is_closing:
- self._amqp_conn.close(320, msg)
- return
- if prev_state == self._STATE_TCP:
- error = AMQPConnectorSocketConnectError(
- AMQPConnectorStackTimeout(
- 'Timeout while connecting socket to {!r}/{}'.format(
- self._conn_params.host, self._addr_record)))
- else:
- assert prev_state == self._STATE_TRANSPORT
- error = AMQPConnectorTransportSetupError(
- AMQPConnectorStackTimeout(
- 'Timeout while setting up transport to {!r}/{}; ssl={}'.
- format(self._conn_params.host, self._addr_record,
- bool(self._conn_params.ssl_options))))
- self._report_completion_and_cleanup(error)
- def _on_tcp_connection_done(self, exc):
- """Handle completion of asynchronous socket connection attempt.
- Reports AMQPConnectorSocketConnectError if TCP socket connection
- failed.
- :param None|BaseException exc: None on success; exception object on
- failure
- """
- self._task_ref = None
- if self._tcp_timeout_ref is not None:
- self._tcp_timeout_ref.cancel()
- self._tcp_timeout_ref = None
- if exc is not None:
- _LOG.error('TCP Connection attempt failed: %r; dest=%r', exc,
- self._addr_record)
- self._report_completion_and_cleanup(
- AMQPConnectorSocketConnectError(exc))
- return
- # We succeeded in making a TCP/IP connection to the server
- _LOG.debug('TCP connection to broker established: %r.', self._sock)
- # Now set up the transport
- self._state = self._STATE_TRANSPORT
- ssl_context = server_hostname = None
- if self._conn_params.ssl_options is not None:
- ssl_context = self._conn_params.ssl_options.context
- server_hostname = self._conn_params.ssl_options.server_hostname
- if server_hostname is None:
- server_hostname = self._conn_params.host
- self._task_ref = self._nbio.create_streaming_connection(
- protocol_factory=functools.partial(self._conn_factory,
- self._conn_params),
- sock=self._sock,
- ssl_context=ssl_context,
- server_hostname=server_hostname,
- on_done=self._on_transport_establishment_done)
- self._sock = None # create_streaming_connection() takes ownership
- def _on_transport_establishment_done(self, result):
- """Handle asynchronous completion of
- `AbstractIOServices.create_streaming_connection()`
- Reports AMQPConnectorTransportSetupError if transport ([SSL]) setup
- failed.
- :param sequence|BaseException result: On success, a two-tuple
- (transport, protocol); on failure, exception instance.
- """
- self._task_ref = None
- if isinstance(result, BaseException):
- _LOG.error(
- 'Attempt to create the streaming transport failed: %r; '
- '%r/%s; ssl=%s', result, self._conn_params.host,
- self._addr_record, bool(self._conn_params.ssl_options))
- self._report_completion_and_cleanup(
- AMQPConnectorTransportSetupError(result))
- return
- # We succeeded in setting up the streaming transport!
- # result is a two-tuple (transport, protocol)
- _LOG.info('Streaming transport linked up: %r.', result)
- _transport, self._amqp_conn = result
- # AMQP handshake is in progress - initiated during transport link-up
- self._state = self._STATE_AMQP
- # We explicitly remove default handler because it raises an exception.
- self._amqp_conn.add_on_open_error_callback(
- self._on_amqp_handshake_done, remove_default=True)
- self._amqp_conn.add_on_open_callback(self._on_amqp_handshake_done)
- def _on_amqp_handshake_done(self, connection, error=None):
- """Handle completion of AMQP connection handshake attempt.
- NOTE: we handle two types of callbacks - success with just connection
- arg as well as the open-error callback with connection and error
- Reports AMQPConnectorAMQPHandshakeError if AMQP handshake failed.
- :param pika.connection.Connection connection:
- :param BaseException | None error: None on success, otherwise
- failure
- """
- _LOG.debug(
- 'AMQPConnector: AMQP handshake attempt completed; state=%s; '
- 'error=%r; %r/%s', self._state, error, self._conn_params.host,
- self._addr_record)
- # Don't need it any more; and _deactivate() checks that it's None
- self._amqp_conn = None
- if self._state == self._STATE_ABORTING:
- # Client-initiated abort takes precedence over timeout
- result = AMQPConnectorAborted()
- elif self._state == self._STATE_TIMEOUT:
- result = AMQPConnectorAMQPHandshakeError(
- AMQPConnectorStackTimeout(
- 'Timeout during AMQP handshake{!r}/{}; ssl={}'.format(
- self._conn_params.host, self._addr_record,
- bool(self._conn_params.ssl_options))))
- elif self._state == self._STATE_AMQP:
- if error is None:
- _LOG.debug(
- 'AMQPConnector: AMQP connection established for %r/%s: %r',
- self._conn_params.host, self._addr_record, connection)
- result = connection
- else:
- _LOG.debug(
- 'AMQPConnector: AMQP connection handshake failed for '
- '%r/%s: %r', self._conn_params.host, self._addr_record,
- error)
- result = AMQPConnectorAMQPHandshakeError(error)
- else:
- # We timed out or aborted and initiated closing of the connection,
- # but this callback snuck in
- _LOG.debug(
- 'AMQPConnector: Ignoring AMQP handshake completion '
- 'notification due to wrong state=%s; error=%r; conn=%r',
- self._state, error, connection)
- return
- self._report_completion_and_cleanup(result)
- class AbstractAMQPConnectionWorkflow(pika.compat.AbstractBase):
- """Interface for implementing a custom TCP/[SSL]/AMQP connection workflow.
- """
- def start(self, connection_configs, connector_factory, native_loop,
- on_done):
- """Asynchronously perform the workflow until success or all retries
- are exhausted. Called by the adapter.
- :param sequence connection_configs: A sequence of one or more
- `pika.connection.Parameters`-based objects. Will attempt to connect
- using each config in the given order.
- :param callable connector_factory: call it without args to obtain a new
- instance of `AMQPConnector` for each connection attempt.
- See `AMQPConnector` for details.
- :param native_loop: Native I/O loop passed by app to the adapter or
- obtained by the adapter by default.
- :param callable on_done: Function to call upon completion of the
- workflow:
- `on_done(pika.connection.Connection |
- AMQPConnectionWorkflowFailed |
- AMQPConnectionWorkflowAborted)`.
- `Connection`-based adapter on success,
- `AMQPConnectionWorkflowFailed` on failure,
- `AMQPConnectionWorkflowAborted` if workflow was aborted.
- :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
- as after starting the workflow.
- """
- raise NotImplementedError
- def abort(self):
- """Abort the workflow asynchronously. The completion callback will be
- called with an instance of AMQPConnectionWorkflowAborted.
- NOTE: we can't cancel/close synchronously because aborting pika
- Connection and its transport requires an asynchronous operation.
- :raises AMQPConnectionWorkflowWrongState: If called in wrong state, such
- as before starting or after completion has been reported.
- """
- raise NotImplementedError
- class AMQPConnectionWorkflow(AbstractAMQPConnectionWorkflow):
- """Implements Pika's default workflow for performing multiple TCP/[SSL]/AMQP
- connection attempts with timeouts and retries until one succeeds or all
- attempts fail.
- The workflow:
- while not success and retries remain:
- 1. For each given config (pika.connection.Parameters object):
- A. Perform DNS resolution of the config's host.
- B. Attempt to establish TCP/[SSL]/AMQP for each resolved address
- until one succeeds, in which case we're done.
- 2. If all configs failed but retries remain, resume from beginning
- after the given retry pause. NOTE: failure of DNS resolution
- is equivalent to one cycle and will be retried after the pause
- if retries remain.
- """
- _SOCK_TYPE = socket.SOCK_STREAM
- _IPPROTO = socket.IPPROTO_TCP
- _STATE_INIT = 0
- _STATE_ACTIVE = 1
- _STATE_ABORTING = 2
- _STATE_DONE = 3
- def __init__(self, _until_first_amqp_attempt=False):
- """
- :param int | float retry_pause: Non-negative number of seconds to wait
- before retrying the config sequence. Meaningful only if retries is
- greater than 0. Defaults to 2 seconds.
- :param bool _until_first_amqp_attempt: INTERNAL USE ONLY; ends workflow
- after first AMQP handshake attempt, regardless of outcome (success
- or failure). The automatic connection logic in
- `pika.connection.Connection` enables this because it's not
- designed/tested to reset all state properly to handle more than one
- AMQP handshake attempt.
- TODO: Do we need getaddrinfo timeout?
- TODO: Would it be useful to implement exponential back-off?
- """
- self._attempts_remaining = None # supplied by start()
- self._retry_pause = None # supplied by start()
- self._until_first_amqp_attempt = _until_first_amqp_attempt
- # Provided by set_io_services()
- # pylint: disable=C0301
- self._nbio = None # type: pika.adapters.utils.nbio_interface.AbstractIOServices
- # Current index within `_connection_configs`; initialized when
- # starting a new connection sequence.
- self._current_config_index = None
- self._connection_configs = None # supplied by start()
- self._connector_factory = None # supplied by start()
- self._on_done = None # supplied by start()
- self._connector = None # type: AMQPConnector
- self._task_ref = None # current cancelable asynchronous task or timer
- self._addrinfo_iter = None
- # Exceptions from all failed connection attempts in this workflow
- self._connection_errors = []
- self._state = self._STATE_INIT
- def set_io_services(self, nbio):
- """Called by the conneciton adapter only on pika's
- `AMQPConnectionWorkflow` instance to provide it the adapter-specific
- `AbstractIOServices` object before calling the `start()` method.
- NOTE: Custom workflow implementations should use the native I/O loop
- directly because `AbstractIOServices` is private to Pika
- implementation and its interface may change without notice.
- :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
- """
- self._nbio = nbio
- def start(
- self,
- connection_configs,
- connector_factory,
- native_loop, # pylint: disable=W0613
- on_done):
- """Override `AbstractAMQPConnectionWorkflow.start()`.
- NOTE: This implementation uses `connection_attempts` and `retry_delay`
- values from the last element of the given `connection_configs` sequence
- as the overall number of connection attempts of the entire
- `connection_configs` sequence and pause between each sequence.
- """
- if self._state != self._STATE_INIT:
- raise AMQPConnectorWrongState(
- 'Already in progress or finished; state={}'.format(self._state))
- try:
- iter(connection_configs)
- except Exception as error:
- raise TypeError(
- 'connection_configs does not support iteration: {!r}'.format(
- error))
- if not connection_configs:
- raise ValueError(
- 'connection_configs is empty: {!r}.'.format(connection_configs))
- self._connection_configs = connection_configs
- self._connector_factory = connector_factory
- self._on_done = on_done
- self._attempts_remaining = connection_configs[-1].connection_attempts
- self._retry_pause = connection_configs[-1].retry_delay
- self._state = self._STATE_ACTIVE
- _LOG.debug('Starting AMQP Connection workflow asynchronously.')
- # Begin from our own I/O loop context to avoid calling back into client
- # from client's call here
- self._task_ref = self._nbio.call_later(
- 0, functools.partial(self._start_new_cycle_async, first=True))
- def abort(self):
- """Override `AbstractAMQPConnectionWorkflow.abort()`.
- """
- if self._state == self._STATE_INIT:
- raise AMQPConnectorWrongState('Cannot abort before starting.')
- elif self._state == self._STATE_DONE:
- raise AMQPConnectorWrongState(
- 'Cannot abort after completion was reported')
- self._state = self._STATE_ABORTING
- self._deactivate()
- _LOG.info('AMQPConnectionWorkflow: beginning client-initiated '
- 'asynchronous abort.')
- if self._connector is None:
- _LOG.debug('AMQPConnectionWorkflow.abort(): no connector, so just '
- 'scheduling completion report via I/O loop.')
- self._nbio.add_callback_threadsafe(
- functools.partial(self._report_completion_and_cleanup,
- AMQPConnectionWorkflowAborted()))
- else:
- _LOG.debug('AMQPConnectionWorkflow.abort(): requesting '
- 'connector.abort().')
- self._connector.abort()
- def _close(self):
- """Cancel asynchronous tasks and clean up to assist garbage collection.
- Transition to _STATE_DONE.
- """
- self._deactivate()
- self._connection_configs = None
- self._nbio = None
- self._connector_factory = None
- self._on_done = None
- self._connector = None
- self._addrinfo_iter = None
- self._connection_errors = None
- self._state = self._STATE_DONE
- def _deactivate(self):
- """Cancel asynchronous tasks.
- """
- if self._task_ref is not None:
- self._task_ref.cancel()
- self._task_ref = None
- def _report_completion_and_cleanup(self, result):
- """Clean up and invoke client's `on_done` callback.
- :param pika.connection.Connection | AMQPConnectionWorkflowFailed result:
- value to pass to user's `on_done` callback.
- """
- if isinstance(result, BaseException):
- _LOG.error('AMQPConnectionWorkflow - reporting failure: %r', result)
- else:
- _LOG.info('AMQPConnectionWorkflow - reporting success: %r', result)
- on_done = self._on_done
- self._close()
- on_done(result)
- def _start_new_cycle_async(self, first):
- """Start a new workflow cycle (if any more attempts are left) beginning
- with the first Parameters object in self._connection_configs. If out of
- attempts, report `AMQPConnectionWorkflowFailed`.
- :param bool first: if True, don't delay; otherwise delay next attempt by
- `self._retry_pause` seconds.
- """
- self._task_ref = None
- assert self._attempts_remaining >= 0, self._attempts_remaining
- if self._attempts_remaining <= 0:
- error = AMQPConnectionWorkflowFailed(self._connection_errors)
- _LOG.error('AMQP connection workflow failed: %r.', error)
- self._report_completion_and_cleanup(error)
- return
- self._attempts_remaining -= 1
- _LOG.debug(
- 'Beginning a new AMQP connection workflow cycle; attempts '
- 'remaining after this: %s', self._attempts_remaining)
- self._current_config_index = None
- self._task_ref = self._nbio.call_later(
- 0 if first else self._retry_pause, self._try_next_config_async)
- def _try_next_config_async(self):
- """Attempt to connect using the next Parameters config. If there are no
- more configs, start a new cycle.
- """
- self._task_ref = None
- if self._current_config_index is None:
- self._current_config_index = 0
- else:
- self._current_config_index += 1
- if self._current_config_index >= len(self._connection_configs):
- _LOG.debug('_try_next_config_async: starting a new cycle.')
- self._start_new_cycle_async(first=False)
- return
- params = self._connection_configs[self._current_config_index]
- _LOG.debug('_try_next_config_async: %r:%s', params.host, params.port)
- # Begin with host address resolution
- assert self._task_ref is None
- self._task_ref = self._nbio.getaddrinfo(
- host=params.host,
- port=params.port,
- socktype=self._SOCK_TYPE,
- proto=self._IPPROTO,
- on_done=self._on_getaddrinfo_async_done)
- def _on_getaddrinfo_async_done(self, addrinfos_or_exc):
- """Handles completion callback from asynchronous `getaddrinfo()`.
- :param list | BaseException addrinfos_or_exc: resolved address records
- returned by `getaddrinfo()` or an exception object from failure.
- """
- self._task_ref = None
- if isinstance(addrinfos_or_exc, BaseException):
- _LOG.error('getaddrinfo failed: %r.', addrinfos_or_exc)
- self._connection_errors.append(addrinfos_or_exc)
- self._start_new_cycle_async(first=False)
- return
- _LOG.debug('getaddrinfo returned %s records', len(addrinfos_or_exc))
- self._addrinfo_iter = iter(addrinfos_or_exc)
- self._try_next_resolved_address()
- def _try_next_resolved_address(self):
- """Try connecting using next resolved address. If there aren't any left,
- continue with next Parameters config.
- """
- try:
- addr_record = next(self._addrinfo_iter)
- except StopIteration:
- _LOG.debug(
- '_try_next_resolved_address: continuing with next config.')
- self._try_next_config_async()
- return
- _LOG.debug('Attempting to connect using address record %r', addr_record)
- self._connector = self._connector_factory() # type: AMQPConnector
- self._connector.start(
- addr_record=addr_record,
- conn_params=self._connection_configs[self._current_config_index],
- on_done=self._on_connector_done)
- def _on_connector_done(self, conn_or_exc):
- """Handle completion of connection attempt by `AMQPConnector`.
- :param pika.connection.Connection | BaseException conn_or_exc: See
- `AMQPConnector.start()` for exception details.
- """
- self._connector = None
- _LOG.debug('Connection attempt completed with %r', conn_or_exc)
- if isinstance(conn_or_exc, BaseException):
- self._connection_errors.append(conn_or_exc)
- if isinstance(conn_or_exc, AMQPConnectorAborted):
- assert self._state == self._STATE_ABORTING, \
- 'Expected _STATE_ABORTING, but got {!r}'.format(self._state)
- self._report_completion_and_cleanup(
- AMQPConnectionWorkflowAborted())
- elif (self._until_first_amqp_attempt and
- isinstance(conn_or_exc, AMQPConnectorAMQPHandshakeError)):
- _LOG.debug('Ending AMQP connection workflow after first failed '
- 'AMQP handshake due to _until_first_amqp_attempt.')
- if isinstance(conn_or_exc.exception,
- pika.exceptions.ConnectionOpenAborted):
- error = AMQPConnectionWorkflowAborted
- else:
- error = AMQPConnectionWorkflowFailed(
- self._connection_errors)
- self._report_completion_and_cleanup(error)
- else:
- self._try_next_resolved_address()
- else:
- # Success!
- self._report_completion_and_cleanup(conn_or_exc)
|