||
- """Base class extended by connection adapters. This extends the
- connection.Connection class to encapsulate connection behavior but still
- isolate socket and low level communication.
- """
- import abc
- import functools
- import logging
- import pika.compat
- import pika.exceptions
- import pika.tcp_socket_opts
- from pika.adapters.utils import connection_workflow, nbio_interface
- from pika import connection
- LOGGER = logging.getLogger(__name__)
- class BaseConnection(connection.Connection):
- """BaseConnection class that should be extended by connection adapters.
- This class abstracts I/O loop and transport services from pika core.
- """
- def __init__(self, parameters, on_open_callback, on_open_error_callback,
- on_close_callback, nbio, internal_connection_workflow):
- """Create a new instance of the Connection object.
- :param None|pika.connection.Parameters parameters: Connection parameters
- :param None|method on_open_callback: Method to call on connection open
- :param None | method on_open_error_callback: Called if the connection
- can't be established or connection establishment is interrupted by
- `Connection.close()`: on_open_error_callback(Connection, exception).
- :param None | method on_close_callback: Called when a previously fully
- open connection is closed:
- `on_close_callback(Connection, exception)`, where `exception` is
- either an instance of `exceptions.ConnectionClosed` if closed by
- user or broker or exception of another type that describes the cause
- of connection failure.
- :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
- asynchronous services
- :param bool internal_connection_workflow: True for autonomous connection
- establishment which is default; False for externally-managed
- connection workflow via the `create_connection()` factory.
- :raises: RuntimeError
- :raises: ValueError
- """
- if parameters and not isinstance(parameters, connection.Parameters):
- raise ValueError(
- 'Expected instance of Parameters, not %r' % (parameters,))
- self._nbio = nbio
- self._connection_workflow = None # type: connection_workflow.AMQPConnectionWorkflow
- self._transport = None # type: pika.adapters.utils.nbio_interface.AbstractStreamTransport
- self._got_eof = False # transport indicated EOF (connection reset)
- super(BaseConnection, self).__init__(
- parameters,
- on_open_callback,
- on_open_error_callback,
- on_close_callback,
- internal_connection_workflow=internal_connection_workflow)
- def _init_connection_state(self):
- """Initialize or reset all of our internal state variables for a given
- connection. If we disconnect and reconnect, all of our state needs to
- be wiped.
- """
- super(BaseConnection, self)._init_connection_state()
- self._connection_workflow = None
- self._transport = None
- self._got_eof = False
- def __repr__(self):
- # def get_socket_repr(sock):
- # """Return socket info suitable for use in repr"""
- # if sock is None:
- # return None
- #
- # sockname = None
- # peername = None
- # try:
- # sockname = sock.getsockname()
- # except pika.compat.SOCKET_ERROR:
- # # closed?
- # pass
- # else:
- # try:
- # peername = sock.getpeername()
- # except pika.compat.SOCKET_ERROR:
- # # not connected?
- # pass
- #
- # return '%s->%s' % (sockname, peername)
- # TODO need helpful __repr__ in transports
- return ('<%s %s transport=%s params=%s>' % (
- self.__class__.__name__, self._STATE_NAMES[self.connection_state],
- self._transport, self.params))
- @classmethod
- @abc.abstractmethod
- def create_connection(cls,
- connection_configs,
- on_done,
- custom_ioloop=None,
- workflow=None):
- """Asynchronously create a connection to an AMQP broker using the given
- configurations. Will attempt to connect using each config in the given
- order, including all compatible resolved IP addresses of the hostname
- supplied in each config, until one is established or all attempts fail.
- See also `_start_connection_workflow()`.
- :param sequence connection_configs: A sequence of one or more
- `pika.connection.Parameters`-based objects.
- :param callable on_done: as defined in
- `connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
- :param object | None custom_ioloop: Provide a custom I/O loop that is
- native to the specific adapter implementation; if None, the adapter
- will use a default loop instance, which is typically a singleton.
- :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
- Pass an instance of an implementation of the
- `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
- defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
- with default values for optional args.
- :returns: Connection workflow instance in use. The user should limit
- their interaction with this object only to it's `abort()` method.
- :rtype: connection_workflow.AbstractAMQPConnectionWorkflow
- """
- raise NotImplementedError
- @classmethod
- def _start_connection_workflow(cls, connection_configs, connection_factory,
- nbio, workflow, on_done):
- """Helper function for custom implementations of `create_connection()`.
- :param sequence connection_configs: A sequence of one or more
- `pika.connection.Parameters`-based objects.
- :param callable connection_factory: A function that takes
- `pika.connection.Parameters` as its only arg and returns a brand new
- `pika.connection.Connection`-based adapter instance each time it is
- called. The factory must instantiate the connection with
- `internal_connection_workflow=False`.
- :param pika.adapters.utils.nbio_interface.AbstractIOServices nbio:
- :param connection_workflow.AbstractAMQPConnectionWorkflow | None workflow:
- Pass an instance of an implementation of the
- `connection_workflow.AbstractAMQPConnectionWorkflow` interface;
- defaults to a `connection_workflow.AMQPConnectionWorkflow` instance
- with default values for optional args.
- :param callable on_done: as defined in
- :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
- :returns: Connection workflow instance in use. The user should limit
- their interaction with this object only to it's `abort()` method.
- :rtype: connection_workflow.AbstractAMQPConnectionWorkflow
- """
- if workflow is None:
- workflow = connection_workflow.AMQPConnectionWorkflow()
- LOGGER.debug('Created default connection workflow %r', workflow)
- if isinstance(workflow, connection_workflow.AMQPConnectionWorkflow):
- workflow.set_io_services(nbio)
- def create_connector():
- """`AMQPConnector` factory."""
- return connection_workflow.AMQPConnector(
- lambda params: _StreamingProtocolShim(
- connection_factory(params)),
- nbio)
- workflow.start(
- connection_configs=connection_configs,
- connector_factory=create_connector,
- native_loop=nbio.get_native_ioloop(),
- on_done=functools.partial(cls._unshim_connection_workflow_callback,
- on_done))
- return workflow
- @property
- def ioloop(self):
- """
- :returns: the native I/O loop instance underlying async services selected
- by user or the default selected by the specialized connection
- adapter (e.g., Twisted reactor, `asyncio.SelectorEventLoop`,
- `select_connection.IOLoop`, etc.)
- :rtype: object
- """
- return self._nbio.get_native_ioloop()
- def _adapter_call_later(self, delay, callback):
- """Implement
- :py:meth:`pika.connection.Connection._adapter_call_later()`.
- """
- return self._nbio.call_later(delay, callback)
- def _adapter_remove_timeout(self, timeout_id):
- """Implement
- :py:meth:`pika.connection.Connection._adapter_remove_timeout()`.
- """
- timeout_id.cancel()
- def _adapter_add_callback_threadsafe(self, callback):
- """Implement
- :py:meth:`pika.connection.Connection._adapter_add_callback_threadsafe()`.
- """
- if not callable(callback):
- raise TypeError(
- 'callback must be a callable, but got %r' % (callback,))
- self._nbio.add_callback_threadsafe(callback)
- def _adapter_connect_stream(self):
- """Initiate full-stack connection establishment asynchronously for
- internally-initiated connection bring-up.
- Upon failed completion, we will invoke
- `Connection._on_stream_terminated()`. NOTE: On success,
- the stack will be up already, so there is no corresponding callback.
- """
- self._connection_workflow = connection_workflow.AMQPConnectionWorkflow(
- _until_first_amqp_attempt=True)
- self._connection_workflow.set_io_services(self._nbio)
- def create_connector():
- """`AMQPConnector` factory"""
- return connection_workflow.AMQPConnector(
- lambda _params: _StreamingProtocolShim(self), self._nbio)
- self._connection_workflow.start(
- [self.params],
- connector_factory=create_connector,
- native_loop=self._nbio.get_native_ioloop(),
- on_done=functools.partial(self._unshim_connection_workflow_callback,
- self._on_connection_workflow_done))
- @staticmethod
- def _unshim_connection_workflow_callback(user_on_done, shim_or_exc):
- """
- :param callable user_on_done: user's `on_done` callback as defined in
- :py:meth:`connection_workflow.AbstractAMQPConnectionWorkflow.start()`.
- :param _StreamingProtocolShim | Exception shim_or_exc:
- """
- result = shim_or_exc
- if isinstance(result, _StreamingProtocolShim):
- result = result.conn
- user_on_done(result)
- def _abort_connection_workflow(self):
- """Asynchronously abort connection workflow. Upon
- completion, `Connection._on_stream_terminated()` will be called with None
- as the error argument.
- Assumption: may be called only while connection is opening.
- """
- assert not self._opened, (
- '_abort_connection_workflow() may be called only when '
- 'connection is opening.')
- if self._transport is None:
- # NOTE: this is possible only when user calls Connection.close() to
- # interrupt internally-initiated connection establishment.
- # self._connection_workflow.abort() would not call
- # Connection.close() before pairing of connection with transport.
- assert self._internal_connection_workflow, (
- 'Unexpected _abort_connection_workflow() call with '
- 'no transport in external connection workflow mode.')
- # This will result in call to _on_connection_workflow_done() upon
- # completion
- self._connection_workflow.abort()
- else:
- # NOTE: we can't use self._connection_workflow.abort() in this case,
- # because it would result in infinite recursion as we're called
- # from Connection.close() and _connection_workflow.abort() calls
- # Connection.close() to abort a connection that's already been
- # paired with a transport. During internally-initiated connection
- # establishment, AMQPConnectionWorkflow will discover that user
- # aborted the connection when it receives
- # pika.exceptions.ConnectionOpenAborted.
- # This completes asynchronously, culminating in call to our method
- # `connection_lost()`
- self._transport.abort()
- def _on_connection_workflow_done(self, conn_or_exc):
- """`AMQPConnectionWorkflow` completion callback.
- :param BaseConnection | Exception conn_or_exc: Our own connection
- instance on success; exception on failure. See
- `AbstractAMQPConnectionWorkflow.start()` for details.
- """
- LOGGER.debug('Full-stack connection workflow completed: %r',
- conn_or_exc)
- self._connection_workflow = None
- # Notify protocol of failure
- if isinstance(conn_or_exc, Exception):
- self._transport = None
- if isinstance(conn_or_exc,
- connection_workflow.AMQPConnectionWorkflowAborted):
- LOGGER.info('Full-stack connection workflow aborted: %r',
- conn_or_exc)
- # So that _handle_connection_workflow_failure() will know it's
- # not a failure
- conn_or_exc = None
- else:
- LOGGER.error('Full-stack connection workflow failed: %r',
- conn_or_exc)
- if (isinstance(conn_or_exc,
- connection_workflow.AMQPConnectionWorkflowFailed)
- and isinstance(
- conn_or_exc.exceptions[-1], connection_workflow.
- AMQPConnectorSocketConnectError)):
- conn_or_exc = pika.exceptions.AMQPConnectionError(
- conn_or_exc)
- self._handle_connection_workflow_failure(conn_or_exc)
- else:
- # NOTE: On success, the stack will be up already, so there is no
- # corresponding callback.
- assert conn_or_exc is self, \
- 'Expected self conn={!r} from workflow, but got {!r}.'.format(
- self, conn_or_exc)
- def _handle_connection_workflow_failure(self, error):
- """Handle failure of self-initiated stack bring-up and call
- `Connection._on_stream_terminated()` if connection is not in closed state
- yet. Called by adapter layer when the full-stack connection workflow
- fails.
- :param Exception | None error: exception instance describing the reason
- for failure or None if the connection workflow was aborted.
- """
- if error is None:
- LOGGER.info('Self-initiated stack bring-up aborted.')
- else:
- LOGGER.error('Self-initiated stack bring-up failed: %r', error)
- if not self.is_closed:
- self._on_stream_terminated(error)
- else:
- # This may happen when AMQP layer bring up was started but did not
- # complete
- LOGGER.debug('_handle_connection_workflow_failure(): '
- 'suppressing - connection already closed.')
- def _adapter_disconnect_stream(self):
- """Asynchronously bring down the streaming transport layer and invoke
- `Connection._on_stream_terminated()` asynchronously when complete.
- """
- if not self._opened:
- self._abort_connection_workflow()
- else:
- # This completes asynchronously, culminating in call to our method
- # `connection_lost()`
- self._transport.abort()
- def _adapter_emit_data(self, data):
- """Take ownership of data and send it to AMQP server as soon as
- possible.
- :param bytes data:
- """
- self._transport.write(data)
- def _proto_connection_made(self, transport):
- """Introduces transport to protocol after transport is connected.
- :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
- :param nbio_interface.AbstractStreamTransport transport:
- :raises Exception: Exception-based exception on error
- """
- self._transport = transport
- # Let connection know that stream is available
- self._on_stream_connected()
- def _proto_connection_lost(self, error):
- """Called upon loss or closing of TCP connection.
- :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
- NOTE: `connection_made()` and `connection_lost()` are each called just
- once and in that order. All other callbacks are called between them.
- :param BaseException | None error: An exception (check for
- `BaseException`) indicates connection failure. None indicates that
- connection was closed on this side, such as when it's aborted or
- when `AbstractStreamProtocol.eof_received()` returns a falsy result.
- :raises Exception: Exception-based exception on error
- """
- self._transport = None
- if error is None:
- # Either result of `eof_received()` or abort
- if self._got_eof:
- error = pika.exceptions.StreamLostError(
- 'Transport indicated EOF')
- else:
- error = pika.exceptions.StreamLostError(
- 'Stream connection lost: {!r}'.format(error))
- LOGGER.log(logging.DEBUG if error is None else logging.ERROR,
- 'connection_lost: %r', error)
- self._on_stream_terminated(error)
- def _proto_eof_received(self): # pylint: disable=R0201
- """Called after the remote peer shuts its write end of the connection.
- :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
- :returns: A falsy value (including None) will cause the transport to
- close itself, resulting in an eventual `connection_lost()` call
- from the transport. If a truthy value is returned, it will be the
- protocol's responsibility to close/abort the transport.
- :rtype: falsy|truthy
- :raises Exception: Exception-based exception on error
- """
- LOGGER.error('Transport indicated EOF.')
- self._got_eof = True
- # This is how a reset connection will typically present itself
- # when we have nothing to send to the server over plaintext stream.
- #
- # Have transport tear down the connection and invoke our
- # `connection_lost` method
- return False
- def _proto_data_received(self, data):
- """Called to deliver incoming data from the server to the protocol.
- :py:class:`.utils.nbio_interface.AbstractStreamProtocol` implementation.
- :param data: Non-empty data bytes.
- :raises Exception: Exception-based exception on error
- """
- self._on_data_available(data)
- class _StreamingProtocolShim(nbio_interface.AbstractStreamProtocol):
- """Shim for callbacks from transport so that we BaseConnection can
- delegate to private methods, thus avoiding contamination of API with
- methods that look public, but aren't.
- """
- # Override AbstractStreamProtocol abstract methods to enable instantiation
- connection_made = None
- connection_lost = None
- eof_received = None
- data_received = None
- def __init__(self, conn):
- """
- :param BaseConnection conn:
- """
- self.conn = conn
- # pylint: disable=W0212
- self.connection_made = conn._proto_connection_made
- self.connection_lost = conn._proto_connection_lost
- self.eof_received = conn._proto_eof_received
- self.data_received = conn._proto_data_received
- def __getattr__(self, attr):
- """Proxy inexistent attribute requests to our connection instance
- so that AMQPConnectionWorkflow/AMQPConnector may treat the shim as an
- actual connection.
- """
- return getattr(self.conn, attr)
- def __repr__(self):
- return '{}: {!r}'.format(self.__class__.__name__, self.conn)
|