123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371 |
- """Utilities for implementing `nbio_interface.AbstractIOServices` for
- pika connection adapters.
- """
- import collections
- import errno
- import functools
- import logging
- import numbers
- import os
- import socket
- import ssl
- import sys
- import traceback
- from pika.adapters.utils.nbio_interface import (AbstractIOReference,
- AbstractStreamTransport)
- import pika.compat
- import pika.diagnostic_utils
- # "Try again" error codes for non-blocking socket I/O - send()/recv().
- # NOTE: POSIX.1 allows either error to be returned for this case and doesn't require
- # them to have the same value.
- _TRY_IO_AGAIN_SOCK_ERROR_CODES = (
- errno.EAGAIN,
- errno.EWOULDBLOCK,
- )
- # "Connection establishment pending" error codes for non-blocking socket
- # connect() call.
- # NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows
- _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = (
- errno.EINPROGRESS,
- errno.EWOULDBLOCK,
- )
- _LOGGER = logging.getLogger(__name__)
- # Decorator that logs exceptions escaping from the decorated function
- _log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER) # pylint: disable=C0103
- def check_callback_arg(callback, name):
- """Raise TypeError if callback is not callable
- :param callback: callback to check
- :param name: Name to include in exception text
- :raises TypeError:
- """
- if not callable(callback):
- raise TypeError('{} must be callable, but got {!r}'.format(
- name, callback))
- def check_fd_arg(fd):
- """Raise TypeError if file descriptor is not an integer
- :param fd: file descriptor
- :raises TypeError:
- """
- if not isinstance(fd, numbers.Integral):
- raise TypeError(
- 'Paramter must be a file descriptor, but got {!r}'.format(fd))
- def _retry_on_sigint(func):
- """Function decorator for retrying on SIGINT.
- """
- @functools.wraps(func)
- def retry_sigint_wrap(*args, **kwargs):
- """Wrapper for decorated function"""
- while True:
- try:
- return func(*args, **kwargs)
- except pika.compat.SOCKET_ERROR as error:
- if error.errno == errno.EINTR:
- continue
- else:
- raise
- return retry_sigint_wrap
- class SocketConnectionMixin(object):
- """Implements
- `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
- on top of
- `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
- basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.
- """
- def connect_socket(self, sock, resolved_addr, on_done):
- """Implement
- :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.
- """
- return _AsyncSocketConnector(
- nbio=self, sock=sock, resolved_addr=resolved_addr,
- on_done=on_done).start()
- class StreamingConnectionMixin(object):
- """Implements
- `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
- top of `.nbio_interface.AbstractFileDescriptorServices` and basic
- `nbio_interface.AbstractIOServices` services.
- """
- def create_streaming_connection(self,
- protocol_factory,
- sock,
- on_done,
- ssl_context=None,
- server_hostname=None):
- """Implement
- :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.
- """
- try:
- return _AsyncStreamConnector(
- nbio=self,
- protocol_factory=protocol_factory,
- sock=sock,
- ssl_context=ssl_context,
- server_hostname=server_hostname,
- on_done=on_done).start()
- except Exception as error:
- _LOGGER.error('create_streaming_connection(%s) failed: %r', sock,
- error)
- # Close the socket since this function takes ownership
- try:
- sock.close()
- except Exception as error: # pylint: disable=W0703
- # We log and suppress the exception from sock.close() so that
- # the original error from _AsyncStreamConnector constructor will
- # percolate
- _LOGGER.error('%s.close() failed: %r', sock, error)
- raise
- class _AsyncServiceAsyncHandle(AbstractIOReference):
- """This module's adaptation of `.nbio_interface.AbstractIOReference`
- """
- def __init__(self, subject):
- """
- :param subject: subject of the reference containing a `cancel()` method
- """
- self._cancel = subject.cancel
- def cancel(self):
- """Cancel pending operation
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- return self._cancel()
- class _AsyncSocketConnector(object):
- """Connects the given non-blocking socket asynchronously using
- `.nbio_interface.AbstractFileDescriptorServices` and basic
- `.nbio_interface.AbstractIOServices`. Used for implementing
- `.nbio_interface.AbstractIOServices.connect_socket()`.
- """
- _STATE_NOT_STARTED = 0 # start() not called yet
- _STATE_ACTIVE = 1 # workflow started
- _STATE_CANCELED = 2 # workflow aborted by user's cancel() call
- _STATE_COMPLETED = 3 # workflow completed: succeeded or failed
- def __init__(self, nbio, sock, resolved_addr, on_done):
- """
- :param AbstractIOServices | AbstractFileDescriptorServices nbio:
- :param socket.socket sock: non-blocking socket that needs to be
- connected via `socket.socket.connect()`
- :param tuple resolved_addr: resolved destination address/port two-tuple
- which is compatible with the given's socket's address family
- :param callable on_done: user callback that takes None upon successful
- completion or exception upon error (check for `BaseException`) as
- its only arg. It will not be called if the operation was cancelled.
- :raises ValueError: if host portion of `resolved_addr` is not an IP
- address or is inconsistent with the socket's address family as
- validated via `socket.inet_pton()`
- """
- check_callback_arg(on_done, 'on_done')
- try:
- socket.inet_pton(sock.family, resolved_addr[0])
- except Exception as error: # pylint: disable=W0703
- if not hasattr(socket, 'inet_pton'):
- _LOGGER.debug(
- 'Unable to check resolved address: no socket.inet_pton().')
- else:
- msg = ('Invalid or unresolved IP address '
- '{!r} for socket {}: {!r}').format(
- resolved_addr, sock, error)
- _LOGGER.error(msg)
- raise ValueError(msg)
- self._nbio = nbio
- self._sock = sock
- self._addr = resolved_addr
- self._on_done = on_done
- self._state = self._STATE_NOT_STARTED
- self._watching_socket_events = False
- @_log_exceptions
- def _cleanup(self):
- """Remove socket watcher, if any
- """
- if self._watching_socket_events:
- self._watching_socket_events = False
- self._nbio.remove_writer(self._sock.fileno())
- def start(self):
- """Start asynchronous connection establishment.
- :rtype: AbstractIOReference
- """
- assert self._state == self._STATE_NOT_STARTED, (
- '_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED',
- self._state)
- self._state = self._STATE_ACTIVE
- # Continue the rest of the operation on the I/O loop to avoid calling
- # user's completion callback from the scope of user's call
- self._nbio.add_callback_threadsafe(self._start_async)
- return _AsyncServiceAsyncHandle(self)
- def cancel(self):
- """Cancel pending connection request without calling user's completion
- callback.
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- if self._state == self._STATE_ACTIVE:
- self._state = self._STATE_CANCELED
- _LOGGER.debug('User canceled connection request for %s to %s',
- self._sock, self._addr)
- self._cleanup()
- return True
- _LOGGER.debug(
- '_AsyncSocketConnector cancel requested when not ACTIVE: '
- 'state=%s; %s', self._state, self._sock)
- return False
- @_log_exceptions
- def _report_completion(self, result):
- """Advance to COMPLETED state, remove socket watcher, and invoke user's
- completion callback.
- :param BaseException | None result: value to pass in user's callback
- """
- _LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s',
- result, self._sock)
- assert isinstance(result, (BaseException, type(None))), (
- '_AsyncSocketConnector._report_completion() expected exception or '
- 'None as result.', result)
- assert self._state == self._STATE_ACTIVE, (
- '_AsyncSocketConnector._report_completion() expected '
- '_STATE_NOT_STARTED', self._state)
- self._state = self._STATE_COMPLETED
- self._cleanup()
- self._on_done(result)
- @_log_exceptions
- def _start_async(self):
- """Called as callback from I/O loop to kick-start the workflow, so it's
- safe to call user's completion callback from here, if needed
- """
- if self._state != self._STATE_ACTIVE:
- # Must have been canceled by user before we were called
- _LOGGER.debug(
- 'Abandoning sock=%s connection establishment to %s '
- 'due to inactive state=%s', self._sock, self._addr, self._state)
- return
- try:
- self._sock.connect(self._addr)
- except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
- if (isinstance(error, pika.compat.SOCKET_ERROR) and
- error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES):
- # Connection establishment is pending
- pass
- else:
- _LOGGER.error('%s.connect(%s) failed: %r', self._sock,
- self._addr, error)
- self._report_completion(error)
- return
- # Get notified when the socket becomes writable
- try:
- self._nbio.set_writer(self._sock.fileno(), self._on_writable)
- except Exception as error: # pylint: disable=W0703
- _LOGGER.exception('async.set_writer(%s) failed: %r', self._sock,
- error)
- self._report_completion(error)
- return
- else:
- self._watching_socket_events = True
- _LOGGER.debug('Connection-establishment is in progress for %s.',
- self._sock)
- @_log_exceptions
- def _on_writable(self):
- """Called when socket connects or fails to. Check for predicament and
- invoke user's completion callback.
- """
- if self._state != self._STATE_ACTIVE:
- # This should never happen since we remove the watcher upon
- # `cancel()`
- _LOGGER.error(
- 'Socket connection-establishment event watcher '
- 'called in inactive state (ignoring): %s; state=%s', self._sock,
- self._state)
- return
- # The moment of truth...
- error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
- if not error_code:
- _LOGGER.info('Socket connected: %s', self._sock)
- result = None
- else:
- error_msg = os.strerror(error_code)
- _LOGGER.error('Socket failed to connect: %s; error=%s (%s)',
- self._sock, error_code, error_msg)
- result = pika.compat.SOCKET_ERROR(error_code, error_msg)
- self._report_completion(result)
- class _AsyncStreamConnector(object):
- """Performs asynchronous SSL session establishment, if requested, on the
- already-connected socket and links the streaming transport to protocol.
- Used for implementing
- `.nbio_interface.AbstractIOServices.create_streaming_connection()`.
- """
- _STATE_NOT_STARTED = 0 # start() not called yet
- _STATE_ACTIVE = 1 # start() called and kicked off the workflow
- _STATE_CANCELED = 2 # workflow terminated by cancel() request
- _STATE_COMPLETED = 3 # workflow terminated by success or failure
- def __init__(self, nbio, protocol_factory, sock, ssl_context,
- server_hostname, on_done):
- """
- NOTE: We take ownership of the given socket upon successful completion
- of the constructor.
- See `AbstractIOServices.create_streaming_connection()` for detailed
- documentation of the corresponding args.
- :param AbstractIOServices | AbstractFileDescriptorServices nbio:
- :param callable protocol_factory:
- :param socket.socket sock:
- :param ssl.SSLContext | None ssl_context:
- :param str | None server_hostname:
- :param callable on_done:
- """
- check_callback_arg(protocol_factory, 'protocol_factory')
- check_callback_arg(on_done, 'on_done')
- if not isinstance(ssl_context, (type(None), ssl.SSLContext)):
- raise ValueError('Expected ssl_context=None | ssl.SSLContext, but '
- 'got {!r}'.format(ssl_context))
- if server_hostname is not None and ssl_context is None:
- raise ValueError('Non-None server_hostname must not be passed '
- 'without ssl context')
- # Check that the socket connection establishment had completed in order
- # to avoid stalling while waiting for the socket to become readable
- # and/or writable.
- try:
- sock.getpeername()
- except Exception as error:
- raise ValueError(
- 'Expected connected socket, but getpeername() failed: '
- 'error={!r}; {}; '.format(error, sock))
- self._nbio = nbio
- self._protocol_factory = protocol_factory
- self._sock = sock
- self._ssl_context = ssl_context
- self._server_hostname = server_hostname
- self._on_done = on_done
- self._state = self._STATE_NOT_STARTED
- self._watching_socket = False
- @_log_exceptions
- def _cleanup(self, close):
- """Cancel pending async operations, if any
- :param bool close: close the socket if true
- """
- _LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close)
- if self._watching_socket:
- _LOGGER.debug(
- '_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close,
- self._sock)
- self._watching_socket = False
- self._nbio.remove_reader(self._sock.fileno())
- self._nbio.remove_writer(self._sock.fileno())
- try:
- if close:
- _LOGGER.debug(
- '_AsyncStreamConnector._cleanup(%r): closing socket; %s',
- close, self._sock)
- try:
- self._sock.close()
- except Exception as error: # pylint: disable=W0703
- _LOGGER.exception('_sock.close() failed: error=%r; %s',
- error, self._sock)
- raise
- finally:
- self._sock = None
- self._nbio = None
- self._protocol_factory = None
- self._ssl_context = None
- self._server_hostname = None
- self._on_done = None
- def start(self):
- """Kick off the workflow
- :rtype: AbstractIOReference
- """
- _LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock)
- assert self._state == self._STATE_NOT_STARTED, (
- '_AsyncStreamConnector.start() expected '
- '_STATE_NOT_STARTED', self._state)
- self._state = self._STATE_ACTIVE
- # Request callback from I/O loop to start processing so that we don't
- # end up making callbacks from the caller's scope
- self._nbio.add_callback_threadsafe(self._start_async)
- return _AsyncServiceAsyncHandle(self)
- def cancel(self):
- """Cancel pending connection request without calling user's completion
- callback.
- :returns: False if was already done or cancelled; True otherwise
- :rtype: bool
- """
- if self._state == self._STATE_ACTIVE:
- self._state = self._STATE_CANCELED
- _LOGGER.debug('User canceled streaming linkup for %s', self._sock)
- # Close the socket, since we took ownership
- self._cleanup(close=True)
- return True
- _LOGGER.debug(
- '_AsyncStreamConnector cancel requested when not ACTIVE: '
- 'state=%s; %s', self._state, self._sock)
- return False
- @_log_exceptions
- def _report_completion(self, result):
- """Advance to COMPLETED state, cancel async operation(s), and invoke
- user's completion callback.
- :param BaseException | tuple result: value to pass in user's callback.
- `tuple(transport, protocol)` on success, exception on error
- """
- _LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s',
- result, self._sock)
- assert isinstance(result, (BaseException, tuple)), (
- '_AsyncStreamConnector._report_completion() expected exception or '
- 'tuple as result.', result, self._state)
- assert self._state == self._STATE_ACTIVE, (
- '_AsyncStreamConnector._report_completion() expected '
- '_STATE_ACTIVE', self._state)
- self._state = self._STATE_COMPLETED
- # Notify user
- try:
- self._on_done(result)
- except Exception:
- _LOGGER.exception('%r: _on_done(%r) failed.',
- self._report_completion, result)
- raise
- finally:
- # NOTE: Close the socket on error, since we took ownership of it
- self._cleanup(close=isinstance(result, BaseException))
- @_log_exceptions
- def _start_async(self):
- """Called as callback from I/O loop to kick-start the workflow, so it's
- safe to call user's completion callback from here if needed
- """
- _LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock)
- if self._state != self._STATE_ACTIVE:
- # Must have been canceled by user before we were called
- _LOGGER.debug(
- 'Abandoning streaming linkup due to inactive state '
- 'transition; state=%s; %s; .', self._state, self._sock)
- return
- # Link up protocol and transport if this is a plaintext linkup;
- # otherwise kick-off SSL workflow first
- if self._ssl_context is None:
- self._linkup()
- else:
- _LOGGER.debug('Starting SSL handshake on %s', self._sock)
- # Wrap our plain socket in ssl socket
- try:
- self._sock = self._ssl_context.wrap_socket(
- self._sock,
- server_side=False,
- do_handshake_on_connect=False,
- suppress_ragged_eofs=False, # False = error on incoming EOF
- server_hostname=self._server_hostname)
- except Exception as error: # pylint: disable=W0703
- _LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock,
- error)
- self._report_completion(error)
- return
- self._do_ssl_handshake()
- @_log_exceptions
- def _linkup(self):
- """Connection is ready: instantiate and link up transport and protocol,
- and invoke user's completion callback.
- """
- _LOGGER.debug('_AsyncStreamConnector._linkup()')
- transport = None
- try:
- # Create the protocol
- try:
- protocol = self._protocol_factory()
- except Exception as error:
- _LOGGER.exception('protocol_factory() failed: error=%r; %s',
- error, self._sock)
- raise
- if self._ssl_context is None:
- # Create plaintext streaming transport
- try:
- transport = _AsyncPlaintextTransport(
- self._sock, protocol, self._nbio)
- except Exception as error:
- _LOGGER.exception('PlainTransport() failed: error=%r; %s',
- error, self._sock)
- raise
- else:
- # Create SSL streaming transport
- try:
- transport = _AsyncSSLTransport(self._sock, protocol,
- self._nbio)
- except Exception as error:
- _LOGGER.exception('SSLTransport() failed: error=%r; %s',
- error, self._sock)
- raise
- _LOGGER.debug('_linkup(): created transport %r', transport)
- # Acquaint protocol with its transport
- try:
- protocol.connection_made(transport)
- except Exception as error:
- _LOGGER.exception(
- 'protocol.connection_made(%r) failed: error=%r; %s',
- transport, error, self._sock)
- raise
- _LOGGER.debug('_linkup(): introduced transport to protocol %r; %r',
- transport, protocol)
- except Exception as error: # pylint: disable=W0703
- result = error
- else:
- result = (transport, protocol)
- self._report_completion(result)
- @_log_exceptions
- def _do_ssl_handshake(self):
- """Perform asynchronous SSL handshake on the already wrapped socket
- """
- _LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()')
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- '_do_ssl_handshake: Abandoning streaming linkup due '
- 'to inactive state transition; state=%s; %s; .', self._state,
- self._sock)
- return
- done = False
- try:
- try:
- self._sock.do_handshake()
- except ssl.SSLError as error:
- if error.errno == ssl.SSL_ERROR_WANT_READ:
- _LOGGER.debug('SSL handshake wants read; %s.', self._sock)
- self._watching_socket = True
- self._nbio.set_reader(self._sock.fileno(),
- self._do_ssl_handshake)
- self._nbio.remove_writer(self._sock.fileno())
- elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
- _LOGGER.debug('SSL handshake wants write. %s', self._sock)
- self._watching_socket = True
- self._nbio.set_writer(self._sock.fileno(),
- self._do_ssl_handshake)
- self._nbio.remove_reader(self._sock.fileno())
- else:
- # Outer catch will report it
- raise
- else:
- done = True
- _LOGGER.info('SSL handshake completed successfully: %s',
- self._sock)
- except Exception as error: # pylint: disable=W0703
- _LOGGER.exception('SSL do_handshake failed: error=%r; %s', error,
- self._sock)
- self._report_completion(error)
- return
- if done:
- # Suspend I/O and link up transport with protocol
- _LOGGER.debug(
- '_do_ssl_handshake: removing watchers ahead of linkup: %s',
- self._sock)
- self._nbio.remove_reader(self._sock.fileno())
- self._nbio.remove_writer(self._sock.fileno())
- # So that our `_cleanup()` won't interfere with the transport's
- # socket watcher configuration.
- self._watching_socket = False
- _LOGGER.debug(
- '_do_ssl_handshake: pre-linkup removal of watchers is done; %s',
- self._sock)
- self._linkup()
- class _AsyncTransportBase( # pylint: disable=W0223
- AbstractStreamTransport):
- """Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.
- """
- _STATE_ACTIVE = 1
- _STATE_FAILED = 2 # connection failed
- _STATE_ABORTED_BY_USER = 3 # cancel() called
- _STATE_COMPLETED = 4 # done with connection
- _MAX_RECV_BYTES = 4096 # per socket.recv() documentation recommendation
- # Max per consume call to prevent event starvation
- _MAX_CONSUME_BYTES = 1024 * 100
- class RxEndOfFile(OSError):
- """We raise this internally when EOF (empty read) is detected on input.
- """
- def __init__(self):
- super(_AsyncTransportBase.RxEndOfFile, self).__init__(
- -1, 'End of input stream (EOF)')
- def __init__(self, sock, protocol, nbio):
- """
- :param socket.socket | ssl.SSLSocket sock: connected socket
- :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
- corresponding protocol in this transport/protocol pairing; the
- protocol already had its `connection_made()` method called.
- :param AbstractIOServices | AbstractFileDescriptorServices nbio:
- """
- _LOGGER.debug('_AsyncTransportBase.__init__: %s', sock)
- self._sock = sock
- self._protocol = protocol
- self._nbio = nbio
- self._state = self._STATE_ACTIVE
- self._tx_buffers = collections.deque()
- self._tx_buffered_byte_count = 0
- def abort(self):
- """Close connection abruptly without waiting for pending I/O to
- complete. Will invoke the corresponding protocol's `connection_lost()`
- method asynchronously (not in context of the abort() call).
- :raises Exception: Exception-based exception on error
- """
- _LOGGER.info('Aborting transport connection: state=%s; %s', self._state,
- self._sock)
- self._initiate_abort(None)
- def get_protocol(self):
- """Return the protocol linked to this transport.
- :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
- """
- return self._protocol
- def get_write_buffer_size(self):
- """
- :returns: Current size of output data buffered by the transport
- :rtype: int
- """
- return self._tx_buffered_byte_count
- def _buffer_tx_data(self, data):
- """Buffer the given data until it can be sent asynchronously.
- :param bytes data:
- :raises ValueError: if called with empty data
- """
- if not data:
- _LOGGER.error('write() called with empty data: state=%s; %s',
- self._state, self._sock)
- raise ValueError('write() called with empty data {!r}'.format(data))
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring write() called during inactive state: '
- 'state=%s; %s', self._state, self._sock)
- return
- self._tx_buffers.append(data)
- self._tx_buffered_byte_count += len(data)
- def _consume(self):
- """Utility method for use by subclasses to ingest data from socket and
- dispatch it to protocol's `data_received()` method socket-specific
- "try again" exception, per-event data consumption limit is reached,
- transport becomes inactive, or a fatal failure.
- Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
- until state becomes inactive (e.g., `protocol.data_received()` callback
- aborts the transport)
- :raises: Whatever the corresponding `sock.recv()` raises except the
- socket error with errno.EINTR
- :raises: Whatever the `protocol.data_received()` callback raises
- :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream
- """
- bytes_consumed = 0
- while (self._state == self._STATE_ACTIVE and
- bytes_consumed < self._MAX_CONSUME_BYTES):
- data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
- bytes_consumed += len(data)
- # Empty data, should disconnect
- if not data:
- _LOGGER.error('Socket EOF; %s', self._sock)
- raise self.RxEndOfFile()
- # Pass the data to the protocol
- try:
- self._protocol.data_received(data)
- except Exception as error:
- _LOGGER.exception(
- 'protocol.data_received() failed: error=%r; %s', error,
- self._sock)
- raise
- def _produce(self):
- """Utility method for use by subclasses to emit data from tx_buffers.
- This method sends chunks from `tx_buffers` until all chunks are
- exhausted or sending is interrupted by an exception. Maintains integrity
- of `self.tx_buffers`.
- :raises: whatever the corresponding `sock.send()` raises except the
- socket error with errno.EINTR
- """
- while self._tx_buffers:
- num_bytes_sent = self._sigint_safe_send(self._sock,
- self._tx_buffers[0])
- chunk = self._tx_buffers.popleft()
- if num_bytes_sent < len(chunk):
- _LOGGER.debug('Partial send, requeing remaining data; %s of %s',
- num_bytes_sent, len(chunk))
- self._tx_buffers.appendleft(chunk[num_bytes_sent:])
- self._tx_buffered_byte_count -= num_bytes_sent
- assert self._tx_buffered_byte_count >= 0, (
- '_AsyncTransportBase._produce() tx buffer size underflow',
- self._tx_buffered_byte_count, self._state)
- @staticmethod
- @_retry_on_sigint
- def _sigint_safe_recv(sock, max_bytes):
- """Receive data from socket, retrying on SIGINT.
- :param sock: stream or SSL socket
- :param max_bytes: maximum number of bytes to receive
- :returns: received data or empty bytes uppon end of file
- :rtype: bytes
- :raises: whatever the corresponding `sock.recv()` raises except socket
- error with errno.EINTR
- """
- return sock.recv(max_bytes)
- @staticmethod
- @_retry_on_sigint
- def _sigint_safe_send(sock, data):
- """Send data to socket, retrying on SIGINT.
- :param sock: stream or SSL socket
- :param data: data bytes to send
- :returns: number of bytes actually sent
- :rtype: int
- :raises: whatever the corresponding `sock.send()` raises except socket
- error with errno.EINTR
- """
- return sock.send(data)
- @_log_exceptions
- def _deactivate(self):
- """Unregister the transport from I/O events
- """
- if self._state == self._STATE_ACTIVE:
- _LOGGER.info('Deactivating transport: state=%s; %s', self._state,
- self._sock)
- self._nbio.remove_reader(self._sock.fileno())
- self._nbio.remove_writer(self._sock.fileno())
- self._tx_buffers.clear()
- @_log_exceptions
- def _close_and_finalize(self):
- """Close the transport's socket and unlink the transport it from
- references to other assets (protocol, etc.)
- """
- if self._state != self._STATE_COMPLETED:
- _LOGGER.info('Closing transport socket and unlinking: state=%s; %s',
- self._state, self._sock)
- try:
- self._sock.shutdown(socket.SHUT_RDWR)
- except pika.compat.SOCKET_ERROR:
- pass
- self._sock.close()
- self._sock = None
- self._protocol = None
- self._nbio = None
- self._state = self._STATE_COMPLETED
- @_log_exceptions
- def _initiate_abort(self, error):
- """Initiate asynchronous abort of the transport that concludes with a
- call to the protocol's `connection_lost()` method. No flushing of
- output buffers will take place.
- :param BaseException | None error: None if being canceled by user,
- including via falsie return value from protocol.eof_received;
- otherwise the exception corresponding to the the failed connection.
- """
- _LOGGER.info(
- '_AsyncTransportBase._initate_abort(): Initiating abrupt '
- 'asynchronous transport shutdown: state=%s; error=%r; %s',
- self._state, error, self._sock)
- assert self._state != self._STATE_COMPLETED, (
- '_AsyncTransportBase._initate_abort() expected '
- 'non-_STATE_COMPLETED', self._state)
- if self._state == self._STATE_COMPLETED:
- return
- self._deactivate()
- # Update state
- if error is None:
- # Being aborted by user
- if self._state == self._STATE_ABORTED_BY_USER:
- # Abort by user already pending
- _LOGGER.debug('_AsyncTransportBase._initiate_abort(): '
- 'ignoring - user-abort already pending.')
- return
- # Notification priority is given to user-initiated abort over
- # failed connection
- self._state = self._STATE_ABORTED_BY_USER
- else:
- # Connection failed
- if self._state != self._STATE_ACTIVE:
- assert self._state == self._STATE_ABORTED_BY_USER, (
- '_AsyncTransportBase._initate_abort() expected '
- '_STATE_ABORTED_BY_USER', self._state)
- return
- self._state = self._STATE_FAILED
- # Schedule callback from I/O loop to avoid potential reentry into user
- # code
- self._nbio.add_callback_threadsafe(
- functools.partial(self._connection_lost_notify_async, error))
- @_log_exceptions
- def _connection_lost_notify_async(self, error):
- """Handle aborting of transport either due to socket error or user-
- initiated `abort()` call. Must be called from an I/O loop callback owned
- by us in order to avoid reentry into user code from user's API call into
- the transport.
- :param BaseException | None error: None if being canceled by user;
- otherwise the exception corresponding to the the failed connection.
- """
- _LOGGER.debug('Concluding transport shutdown: state=%s; error=%r',
- self._state, error)
- if self._state == self._STATE_COMPLETED:
- return
- if error is not None and self._state != self._STATE_FAILED:
- # Priority is given to user-initiated abort notification
- assert self._state == self._STATE_ABORTED_BY_USER, (
- '_AsyncTransportBase._connection_lost_notify_async() '
- 'expected _STATE_ABORTED_BY_USER', self._state)
- return
- # Inform protocol
- try:
- self._protocol.connection_lost(error)
- except Exception as exc: # pylint: disable=W0703
- _LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s',
- error, exc, self._sock)
- # Re-raise, since we've exhausted our normal failure notification
- # mechanism (i.e., connection_lost())
- raise
- finally:
- self._close_and_finalize()
- class _AsyncPlaintextTransport(_AsyncTransportBase):
- """Implementation of `nbio_interface.AbstractStreamTransport` for a
- plaintext connection.
- """
- def __init__(self, sock, protocol, nbio):
- """
- :param socket.socket sock: non-blocking connected socket
- :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
- corresponding protocol in this transport/protocol pairing; the
- protocol already had its `connection_made()` method called.
- :param AbstractIOServices | AbstractFileDescriptorServices nbio:
- """
- super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio)
- # Request to be notified of incoming data; we'll watch for writability
- # only when our write buffer is non-empty
- self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
- def write(self, data):
- """Buffer the given data until it can be sent asynchronously.
- :param bytes data:
- :raises ValueError: if called with empty data
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring write() called during inactive state: '
- 'state=%s; %s', self._state, self._sock)
- return
- assert data, ('_AsyncPlaintextTransport.write(): empty data from user.',
- data, self._state)
- if not self.get_write_buffer_size():
- self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
- _LOGGER.debug('Turned on writability watcher: %s', self._sock)
- self._buffer_tx_data(data)
- @_log_exceptions
- def _on_socket_readable(self):
- """Ingest data from socket and dispatch it to protocol until exception
- occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
- limit is reached, transport becomes inactive, or failure.
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring readability notification due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- return
- try:
- self._consume()
- except self.RxEndOfFile:
- try:
- keep_open = self._protocol.eof_received()
- except Exception as error: # pylint: disable=W0703
- _LOGGER.exception(
- 'protocol.eof_received() failed: error=%r; %s', error,
- self._sock)
- self._initiate_abort(error)
- else:
- if keep_open:
- _LOGGER.info(
- 'protocol.eof_received() elected to keep open: %s',
- self._sock)
- self._nbio.remove_reader(self._sock.fileno())
- else:
- _LOGGER.info('protocol.eof_received() elected to close: %s',
- self._sock)
- self._initiate_abort(None)
- except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
- if (isinstance(error, pika.compat.SOCKET_ERROR) and
- error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
- _LOGGER.debug('Recv would block on %s', self._sock)
- else:
- _LOGGER.exception(
- '_AsyncBaseTransport._consume() failed, aborting '
- 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
- error, self._sock, ''.join(
- traceback.format_exception(*sys.exc_info())))
- self._initiate_abort(error)
- else:
- if self._state != self._STATE_ACTIVE:
- # Most likely our protocol's `data_received()` aborted the
- # transport
- _LOGGER.debug(
- 'Leaving Plaintext consumer due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- @_log_exceptions
- def _on_socket_writable(self):
- """Handle writable socket notification
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring writability notification due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- return
- # We shouldn't be getting called with empty tx buffers
- assert self._tx_buffers, (
- '_AsyncPlaintextTransport._on_socket_writable() called, '
- 'but _tx_buffers is empty.', self._state)
- try:
- # Transmit buffered data to remote socket
- self._produce()
- except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
- if (isinstance(error, pika.compat.SOCKET_ERROR) and
- error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
- _LOGGER.debug('Send would block on %s', self._sock)
- else:
- _LOGGER.exception(
- '_AsyncBaseTransport._produce() failed, aborting '
- 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
- error, self._sock, ''.join(
- traceback.format_exception(*sys.exc_info())))
- self._initiate_abort(error)
- else:
- if not self._tx_buffers:
- self._nbio.remove_writer(self._sock.fileno())
- _LOGGER.debug('Turned off writability watcher: %s', self._sock)
- class _AsyncSSLTransport(_AsyncTransportBase):
- """Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
- connection.
- """
- def __init__(self, sock, protocol, nbio):
- """
- :param ssl.SSLSocket sock: non-blocking connected socket
- :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
- corresponding protocol in this transport/protocol pairing; the
- protocol already had its `connection_made()` method called.
- :param AbstractIOServices | AbstractFileDescriptorServices nbio:
- """
- super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio)
- self._ssl_readable_action = self._consume
- self._ssl_writable_action = None
- # Bootstrap consumer; we'll take care of producer once data is buffered
- self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
- # Try reading asap just in case read-ahead caused some
- self._nbio.add_callback_threadsafe(self._on_socket_readable)
- def write(self, data):
- """Buffer the given data until it can be sent asynchronously.
- :param bytes data:
- :raises ValueError: if called with empty data
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring write() called during inactive state: '
- 'state=%s; %s', self._state, self._sock)
- return
- tx_buffer_was_empty = self.get_write_buffer_size() == 0
- self._buffer_tx_data(data)
- if tx_buffer_was_empty and self._ssl_writable_action is None:
- self._ssl_writable_action = self._produce
- self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
- _LOGGER.debug('Turned on writability watcher: %s', self._sock)
- @_log_exceptions
- def _on_socket_readable(self):
- """Handle readable socket indication
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring readability notification due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- return
- if self._ssl_readable_action:
- try:
- self._ssl_readable_action()
- except Exception as error: # pylint: disable=W0703
- self._initiate_abort(error)
- else:
- _LOGGER.debug(
- 'SSL readable action was suppressed: '
- 'ssl_writable_action=%r; %s', self._ssl_writable_action,
- self._sock)
- @_log_exceptions
- def _on_socket_writable(self):
- """Handle writable socket notification
- """
- if self._state != self._STATE_ACTIVE:
- _LOGGER.debug(
- 'Ignoring writability notification due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- return
- if self._ssl_writable_action:
- try:
- self._ssl_writable_action()
- except Exception as error: # pylint: disable=W0703
- self._initiate_abort(error)
- else:
- _LOGGER.debug(
- 'SSL writable action was suppressed: '
- 'ssl_readable_action=%r; %s', self._ssl_readable_action,
- self._sock)
- @_log_exceptions
- def _consume(self):
- """[override] Ingest data from socket and dispatch it to protocol until
- exception occurs (typically ssl.SSLError with
- SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
- transport becomes inactive, or failure.
- Update consumer/producer registration.
- :raises Exception: error that signals that connection needs to be
- aborted
- """
- next_consume_on_readable = True
- try:
- super(_AsyncSSLTransport, self)._consume()
- except ssl.SSLError as error:
- if error.errno == ssl.SSL_ERROR_WANT_READ:
- _LOGGER.debug('SSL ingester wants read: %s', self._sock)
- elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
- # Looks like SSL re-negotiation
- _LOGGER.debug('SSL ingester wants write: %s', self._sock)
- next_consume_on_readable = False
- else:
- _LOGGER.exception(
- '_AsyncBaseTransport._consume() failed, aborting '
- 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
- error, self._sock, ''.join(
- traceback.format_exception(*sys.exc_info())))
- raise # let outer catch block abort the transport
- else:
- if self._state != self._STATE_ACTIVE:
- # Most likely our protocol's `data_received()` aborted the
- # transport
- _LOGGER.debug(
- 'Leaving SSL consumer due to inactive '
- 'state: state=%s; %s', self._state, self._sock)
- return
- # Consumer exited without exception; there may still be more,
- # possibly unprocessed, data records in SSL input buffers that
- # can be read without waiting for socket to become readable.
- # In case buffered input SSL data records still remain
- self._nbio.add_callback_threadsafe(self._on_socket_readable)
- # Update consumer registration
- if next_consume_on_readable:
- if not self._ssl_readable_action:
- self._nbio.set_reader(self._sock.fileno(),
- self._on_socket_readable)
- self._ssl_readable_action = self._consume
- # NOTE: can't use identity check, it fails for instance methods
- if self._ssl_writable_action == self._consume: # pylint: disable=W0143
- self._nbio.remove_writer(self._sock.fileno())
- self._ssl_writable_action = None
- else:
- # WANT_WRITE
- if not self._ssl_writable_action:
- self._nbio.set_writer(self._sock.fileno(),
- self._on_socket_writable)
- self._ssl_writable_action = self._consume
- if self._ssl_readable_action:
- self._nbio.remove_reader(self._sock.fileno())
- self._ssl_readable_action = None
- # Update producer registration
- if self._tx_buffers and not self._ssl_writable_action:
- self._ssl_writable_action = self._produce
- self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
- @_log_exceptions
- def _produce(self):
- """[override] Emit data from tx_buffers all chunks are exhausted or
- sending is interrupted by an exception (typically ssl.SSLError with
- SSL_ERROR_WANT_READ/WRITE).
- Update consumer/producer registration.
- :raises Exception: error that signals that connection needs to be
- aborted
- """
- next_produce_on_writable = None # None means no need to produce
- try:
- super(_AsyncSSLTransport, self)._produce()
- except ssl.SSLError as error:
- if error.errno == ssl.SSL_ERROR_WANT_READ:
- # Looks like SSL re-negotiation
- _LOGGER.debug('SSL emitter wants read: %s', self._sock)
- next_produce_on_writable = False
- elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
- _LOGGER.debug('SSL emitter wants write: %s', self._sock)
- next_produce_on_writable = True
- else:
- _LOGGER.exception(
- '_AsyncBaseTransport._produce() failed, aborting '
- 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
- error, self._sock, ''.join(
- traceback.format_exception(*sys.exc_info())))
- raise # let outer catch block abort the transport
- else:
- # No exception, so everything must have been written to the socket
- assert not self._tx_buffers, (
- '_AsyncSSLTransport._produce(): no exception from parent '
- 'class, but data remains in _tx_buffers.', len(
- self._tx_buffers))
- # Update producer registration
- if self._tx_buffers:
- assert next_produce_on_writable is not None, (
- '_AsyncSSLTransport._produce(): next_produce_on_writable is '
- 'still None', self._state)
- if next_produce_on_writable:
- if not self._ssl_writable_action:
- self._nbio.set_writer(self._sock.fileno(),
- self._on_socket_writable)
- self._ssl_writable_action = self._produce
- # NOTE: can't use identity check, it fails for instance methods
- if self._ssl_readable_action == self._produce: # pylint: disable=W0143
- self._nbio.remove_reader(self._sock.fileno())
- self._ssl_readable_action = None
- else:
- # WANT_READ
- if not self._ssl_readable_action:
- self._nbio.set_reader(self._sock.fileno(),
- self._on_socket_readable)
- self._ssl_readable_action = self._produce
- if self._ssl_writable_action:
- self._nbio.remove_writer(self._sock.fileno())
- self._ssl_writable_action = None
- else:
- # NOTE: can't use identity check, it fails for instance methods
- if self._ssl_readable_action == self._produce: # pylint: disable=W0143
- self._nbio.remove_reader(self._sock.fileno())
- self._ssl_readable_action = None
- assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143
- '_AsyncSSLTransport._produce(): with empty tx_buffers, '
- 'writable_action cannot be _produce when readable is '
- '_produce', self._state)
- else:
- # NOTE: can't use identity check, it fails for instance methods
- assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143
- '_AsyncSSLTransport._produce(): with empty tx_buffers, '
- 'expected writable_action as _produce when readable_action '
- 'is not _produce', 'writable_action:',
- self._ssl_writable_action, 'readable_action:',
- self._ssl_readable_action, 'state:', self._state)
- self._ssl_writable_action = None
- self._nbio.remove_writer(self._sock.fileno())
- # Update consumer registration
- if not self._ssl_readable_action:
- self._ssl_readable_action = self._consume
- self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
- # In case input SSL data records have been buffered
- self._nbio.add_callback_threadsafe(self._on_socket_readable)
- elif self._sock.pending():
- self._nbio.add_callback_threadsafe(self._on_socket_readable)
|