io_services_utils.py 52 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371
  1. """Utilities for implementing `nbio_interface.AbstractIOServices` for
  2. pika connection adapters.
  3. """
  4. import collections
  5. import errno
  6. import functools
  7. import logging
  8. import numbers
  9. import os
  10. import socket
  11. import ssl
  12. import sys
  13. import traceback
  14. from pika.adapters.utils.nbio_interface import (AbstractIOReference,
  15. AbstractStreamTransport)
  16. import pika.compat
  17. import pika.diagnostic_utils
  18. # "Try again" error codes for non-blocking socket I/O - send()/recv().
  19. # NOTE: POSIX.1 allows either error to be returned for this case and doesn't require
  20. # them to have the same value.
  21. _TRY_IO_AGAIN_SOCK_ERROR_CODES = (
  22. errno.EAGAIN,
  23. errno.EWOULDBLOCK,
  24. )
  25. # "Connection establishment pending" error codes for non-blocking socket
  26. # connect() call.
  27. # NOTE: EINPROGRESS for Posix and EWOULDBLOCK for Windows
  28. _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES = (
  29. errno.EINPROGRESS,
  30. errno.EWOULDBLOCK,
  31. )
  32. _LOGGER = logging.getLogger(__name__)
  33. # Decorator that logs exceptions escaping from the decorated function
  34. _log_exceptions = pika.diagnostic_utils.create_log_exception_decorator(_LOGGER) # pylint: disable=C0103
  35. def check_callback_arg(callback, name):
  36. """Raise TypeError if callback is not callable
  37. :param callback: callback to check
  38. :param name: Name to include in exception text
  39. :raises TypeError:
  40. """
  41. if not callable(callback):
  42. raise TypeError('{} must be callable, but got {!r}'.format(
  43. name, callback))
  44. def check_fd_arg(fd):
  45. """Raise TypeError if file descriptor is not an integer
  46. :param fd: file descriptor
  47. :raises TypeError:
  48. """
  49. if not isinstance(fd, numbers.Integral):
  50. raise TypeError(
  51. 'Paramter must be a file descriptor, but got {!r}'.format(fd))
  52. def _retry_on_sigint(func):
  53. """Function decorator for retrying on SIGINT.
  54. """
  55. @functools.wraps(func)
  56. def retry_sigint_wrap(*args, **kwargs):
  57. """Wrapper for decorated function"""
  58. while True:
  59. try:
  60. return func(*args, **kwargs)
  61. except pika.compat.SOCKET_ERROR as error:
  62. if error.errno == errno.EINTR:
  63. continue
  64. else:
  65. raise
  66. return retry_sigint_wrap
  67. class SocketConnectionMixin(object):
  68. """Implements
  69. `pika.adapters.utils.nbio_interface.AbstractIOServices.connect_socket()`
  70. on top of
  71. `pika.adapters.utils.nbio_interface.AbstractFileDescriptorServices` and
  72. basic `pika.adapters.utils.nbio_interface.AbstractIOServices`.
  73. """
  74. def connect_socket(self, sock, resolved_addr, on_done):
  75. """Implement
  76. :py:meth:`.nbio_interface.AbstractIOServices.connect_socket()`.
  77. """
  78. return _AsyncSocketConnector(
  79. nbio=self, sock=sock, resolved_addr=resolved_addr,
  80. on_done=on_done).start()
  81. class StreamingConnectionMixin(object):
  82. """Implements
  83. `.nbio_interface.AbstractIOServices.create_streaming_connection()` on
  84. top of `.nbio_interface.AbstractFileDescriptorServices` and basic
  85. `nbio_interface.AbstractIOServices` services.
  86. """
  87. def create_streaming_connection(self,
  88. protocol_factory,
  89. sock,
  90. on_done,
  91. ssl_context=None,
  92. server_hostname=None):
  93. """Implement
  94. :py:meth:`.nbio_interface.AbstractIOServices.create_streaming_connection()`.
  95. """
  96. try:
  97. return _AsyncStreamConnector(
  98. nbio=self,
  99. protocol_factory=protocol_factory,
  100. sock=sock,
  101. ssl_context=ssl_context,
  102. server_hostname=server_hostname,
  103. on_done=on_done).start()
  104. except Exception as error:
  105. _LOGGER.error('create_streaming_connection(%s) failed: %r', sock,
  106. error)
  107. # Close the socket since this function takes ownership
  108. try:
  109. sock.close()
  110. except Exception as error: # pylint: disable=W0703
  111. # We log and suppress the exception from sock.close() so that
  112. # the original error from _AsyncStreamConnector constructor will
  113. # percolate
  114. _LOGGER.error('%s.close() failed: %r', sock, error)
  115. raise
  116. class _AsyncServiceAsyncHandle(AbstractIOReference):
  117. """This module's adaptation of `.nbio_interface.AbstractIOReference`
  118. """
  119. def __init__(self, subject):
  120. """
  121. :param subject: subject of the reference containing a `cancel()` method
  122. """
  123. self._cancel = subject.cancel
  124. def cancel(self):
  125. """Cancel pending operation
  126. :returns: False if was already done or cancelled; True otherwise
  127. :rtype: bool
  128. """
  129. return self._cancel()
  130. class _AsyncSocketConnector(object):
  131. """Connects the given non-blocking socket asynchronously using
  132. `.nbio_interface.AbstractFileDescriptorServices` and basic
  133. `.nbio_interface.AbstractIOServices`. Used for implementing
  134. `.nbio_interface.AbstractIOServices.connect_socket()`.
  135. """
  136. _STATE_NOT_STARTED = 0 # start() not called yet
  137. _STATE_ACTIVE = 1 # workflow started
  138. _STATE_CANCELED = 2 # workflow aborted by user's cancel() call
  139. _STATE_COMPLETED = 3 # workflow completed: succeeded or failed
  140. def __init__(self, nbio, sock, resolved_addr, on_done):
  141. """
  142. :param AbstractIOServices | AbstractFileDescriptorServices nbio:
  143. :param socket.socket sock: non-blocking socket that needs to be
  144. connected via `socket.socket.connect()`
  145. :param tuple resolved_addr: resolved destination address/port two-tuple
  146. which is compatible with the given's socket's address family
  147. :param callable on_done: user callback that takes None upon successful
  148. completion or exception upon error (check for `BaseException`) as
  149. its only arg. It will not be called if the operation was cancelled.
  150. :raises ValueError: if host portion of `resolved_addr` is not an IP
  151. address or is inconsistent with the socket's address family as
  152. validated via `socket.inet_pton()`
  153. """
  154. check_callback_arg(on_done, 'on_done')
  155. try:
  156. socket.inet_pton(sock.family, resolved_addr[0])
  157. except Exception as error: # pylint: disable=W0703
  158. if not hasattr(socket, 'inet_pton'):
  159. _LOGGER.debug(
  160. 'Unable to check resolved address: no socket.inet_pton().')
  161. else:
  162. msg = ('Invalid or unresolved IP address '
  163. '{!r} for socket {}: {!r}').format(
  164. resolved_addr, sock, error)
  165. _LOGGER.error(msg)
  166. raise ValueError(msg)
  167. self._nbio = nbio
  168. self._sock = sock
  169. self._addr = resolved_addr
  170. self._on_done = on_done
  171. self._state = self._STATE_NOT_STARTED
  172. self._watching_socket_events = False
  173. @_log_exceptions
  174. def _cleanup(self):
  175. """Remove socket watcher, if any
  176. """
  177. if self._watching_socket_events:
  178. self._watching_socket_events = False
  179. self._nbio.remove_writer(self._sock.fileno())
  180. def start(self):
  181. """Start asynchronous connection establishment.
  182. :rtype: AbstractIOReference
  183. """
  184. assert self._state == self._STATE_NOT_STARTED, (
  185. '_AsyncSocketConnector.start(): expected _STATE_NOT_STARTED',
  186. self._state)
  187. self._state = self._STATE_ACTIVE
  188. # Continue the rest of the operation on the I/O loop to avoid calling
  189. # user's completion callback from the scope of user's call
  190. self._nbio.add_callback_threadsafe(self._start_async)
  191. return _AsyncServiceAsyncHandle(self)
  192. def cancel(self):
  193. """Cancel pending connection request without calling user's completion
  194. callback.
  195. :returns: False if was already done or cancelled; True otherwise
  196. :rtype: bool
  197. """
  198. if self._state == self._STATE_ACTIVE:
  199. self._state = self._STATE_CANCELED
  200. _LOGGER.debug('User canceled connection request for %s to %s',
  201. self._sock, self._addr)
  202. self._cleanup()
  203. return True
  204. _LOGGER.debug(
  205. '_AsyncSocketConnector cancel requested when not ACTIVE: '
  206. 'state=%s; %s', self._state, self._sock)
  207. return False
  208. @_log_exceptions
  209. def _report_completion(self, result):
  210. """Advance to COMPLETED state, remove socket watcher, and invoke user's
  211. completion callback.
  212. :param BaseException | None result: value to pass in user's callback
  213. """
  214. _LOGGER.debug('_AsyncSocketConnector._report_completion(%r); %s',
  215. result, self._sock)
  216. assert isinstance(result, (BaseException, type(None))), (
  217. '_AsyncSocketConnector._report_completion() expected exception or '
  218. 'None as result.', result)
  219. assert self._state == self._STATE_ACTIVE, (
  220. '_AsyncSocketConnector._report_completion() expected '
  221. '_STATE_NOT_STARTED', self._state)
  222. self._state = self._STATE_COMPLETED
  223. self._cleanup()
  224. self._on_done(result)
  225. @_log_exceptions
  226. def _start_async(self):
  227. """Called as callback from I/O loop to kick-start the workflow, so it's
  228. safe to call user's completion callback from here, if needed
  229. """
  230. if self._state != self._STATE_ACTIVE:
  231. # Must have been canceled by user before we were called
  232. _LOGGER.debug(
  233. 'Abandoning sock=%s connection establishment to %s '
  234. 'due to inactive state=%s', self._sock, self._addr, self._state)
  235. return
  236. try:
  237. self._sock.connect(self._addr)
  238. except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
  239. if (isinstance(error, pika.compat.SOCKET_ERROR) and
  240. error.errno in _CONNECTION_IN_PROGRESS_SOCK_ERROR_CODES):
  241. # Connection establishment is pending
  242. pass
  243. else:
  244. _LOGGER.error('%s.connect(%s) failed: %r', self._sock,
  245. self._addr, error)
  246. self._report_completion(error)
  247. return
  248. # Get notified when the socket becomes writable
  249. try:
  250. self._nbio.set_writer(self._sock.fileno(), self._on_writable)
  251. except Exception as error: # pylint: disable=W0703
  252. _LOGGER.exception('async.set_writer(%s) failed: %r', self._sock,
  253. error)
  254. self._report_completion(error)
  255. return
  256. else:
  257. self._watching_socket_events = True
  258. _LOGGER.debug('Connection-establishment is in progress for %s.',
  259. self._sock)
  260. @_log_exceptions
  261. def _on_writable(self):
  262. """Called when socket connects or fails to. Check for predicament and
  263. invoke user's completion callback.
  264. """
  265. if self._state != self._STATE_ACTIVE:
  266. # This should never happen since we remove the watcher upon
  267. # `cancel()`
  268. _LOGGER.error(
  269. 'Socket connection-establishment event watcher '
  270. 'called in inactive state (ignoring): %s; state=%s', self._sock,
  271. self._state)
  272. return
  273. # The moment of truth...
  274. error_code = self._sock.getsockopt(socket.SOL_SOCKET, socket.SO_ERROR)
  275. if not error_code:
  276. _LOGGER.info('Socket connected: %s', self._sock)
  277. result = None
  278. else:
  279. error_msg = os.strerror(error_code)
  280. _LOGGER.error('Socket failed to connect: %s; error=%s (%s)',
  281. self._sock, error_code, error_msg)
  282. result = pika.compat.SOCKET_ERROR(error_code, error_msg)
  283. self._report_completion(result)
  284. class _AsyncStreamConnector(object):
  285. """Performs asynchronous SSL session establishment, if requested, on the
  286. already-connected socket and links the streaming transport to protocol.
  287. Used for implementing
  288. `.nbio_interface.AbstractIOServices.create_streaming_connection()`.
  289. """
  290. _STATE_NOT_STARTED = 0 # start() not called yet
  291. _STATE_ACTIVE = 1 # start() called and kicked off the workflow
  292. _STATE_CANCELED = 2 # workflow terminated by cancel() request
  293. _STATE_COMPLETED = 3 # workflow terminated by success or failure
  294. def __init__(self, nbio, protocol_factory, sock, ssl_context,
  295. server_hostname, on_done):
  296. """
  297. NOTE: We take ownership of the given socket upon successful completion
  298. of the constructor.
  299. See `AbstractIOServices.create_streaming_connection()` for detailed
  300. documentation of the corresponding args.
  301. :param AbstractIOServices | AbstractFileDescriptorServices nbio:
  302. :param callable protocol_factory:
  303. :param socket.socket sock:
  304. :param ssl.SSLContext | None ssl_context:
  305. :param str | None server_hostname:
  306. :param callable on_done:
  307. """
  308. check_callback_arg(protocol_factory, 'protocol_factory')
  309. check_callback_arg(on_done, 'on_done')
  310. if not isinstance(ssl_context, (type(None), ssl.SSLContext)):
  311. raise ValueError('Expected ssl_context=None | ssl.SSLContext, but '
  312. 'got {!r}'.format(ssl_context))
  313. if server_hostname is not None and ssl_context is None:
  314. raise ValueError('Non-None server_hostname must not be passed '
  315. 'without ssl context')
  316. # Check that the socket connection establishment had completed in order
  317. # to avoid stalling while waiting for the socket to become readable
  318. # and/or writable.
  319. try:
  320. sock.getpeername()
  321. except Exception as error:
  322. raise ValueError(
  323. 'Expected connected socket, but getpeername() failed: '
  324. 'error={!r}; {}; '.format(error, sock))
  325. self._nbio = nbio
  326. self._protocol_factory = protocol_factory
  327. self._sock = sock
  328. self._ssl_context = ssl_context
  329. self._server_hostname = server_hostname
  330. self._on_done = on_done
  331. self._state = self._STATE_NOT_STARTED
  332. self._watching_socket = False
  333. @_log_exceptions
  334. def _cleanup(self, close):
  335. """Cancel pending async operations, if any
  336. :param bool close: close the socket if true
  337. """
  338. _LOGGER.debug('_AsyncStreamConnector._cleanup(%r)', close)
  339. if self._watching_socket:
  340. _LOGGER.debug(
  341. '_AsyncStreamConnector._cleanup(%r): removing RdWr; %s', close,
  342. self._sock)
  343. self._watching_socket = False
  344. self._nbio.remove_reader(self._sock.fileno())
  345. self._nbio.remove_writer(self._sock.fileno())
  346. try:
  347. if close:
  348. _LOGGER.debug(
  349. '_AsyncStreamConnector._cleanup(%r): closing socket; %s',
  350. close, self._sock)
  351. try:
  352. self._sock.close()
  353. except Exception as error: # pylint: disable=W0703
  354. _LOGGER.exception('_sock.close() failed: error=%r; %s',
  355. error, self._sock)
  356. raise
  357. finally:
  358. self._sock = None
  359. self._nbio = None
  360. self._protocol_factory = None
  361. self._ssl_context = None
  362. self._server_hostname = None
  363. self._on_done = None
  364. def start(self):
  365. """Kick off the workflow
  366. :rtype: AbstractIOReference
  367. """
  368. _LOGGER.debug('_AsyncStreamConnector.start(); %s', self._sock)
  369. assert self._state == self._STATE_NOT_STARTED, (
  370. '_AsyncStreamConnector.start() expected '
  371. '_STATE_NOT_STARTED', self._state)
  372. self._state = self._STATE_ACTIVE
  373. # Request callback from I/O loop to start processing so that we don't
  374. # end up making callbacks from the caller's scope
  375. self._nbio.add_callback_threadsafe(self._start_async)
  376. return _AsyncServiceAsyncHandle(self)
  377. def cancel(self):
  378. """Cancel pending connection request without calling user's completion
  379. callback.
  380. :returns: False if was already done or cancelled; True otherwise
  381. :rtype: bool
  382. """
  383. if self._state == self._STATE_ACTIVE:
  384. self._state = self._STATE_CANCELED
  385. _LOGGER.debug('User canceled streaming linkup for %s', self._sock)
  386. # Close the socket, since we took ownership
  387. self._cleanup(close=True)
  388. return True
  389. _LOGGER.debug(
  390. '_AsyncStreamConnector cancel requested when not ACTIVE: '
  391. 'state=%s; %s', self._state, self._sock)
  392. return False
  393. @_log_exceptions
  394. def _report_completion(self, result):
  395. """Advance to COMPLETED state, cancel async operation(s), and invoke
  396. user's completion callback.
  397. :param BaseException | tuple result: value to pass in user's callback.
  398. `tuple(transport, protocol)` on success, exception on error
  399. """
  400. _LOGGER.debug('_AsyncStreamConnector._report_completion(%r); %s',
  401. result, self._sock)
  402. assert isinstance(result, (BaseException, tuple)), (
  403. '_AsyncStreamConnector._report_completion() expected exception or '
  404. 'tuple as result.', result, self._state)
  405. assert self._state == self._STATE_ACTIVE, (
  406. '_AsyncStreamConnector._report_completion() expected '
  407. '_STATE_ACTIVE', self._state)
  408. self._state = self._STATE_COMPLETED
  409. # Notify user
  410. try:
  411. self._on_done(result)
  412. except Exception:
  413. _LOGGER.exception('%r: _on_done(%r) failed.',
  414. self._report_completion, result)
  415. raise
  416. finally:
  417. # NOTE: Close the socket on error, since we took ownership of it
  418. self._cleanup(close=isinstance(result, BaseException))
  419. @_log_exceptions
  420. def _start_async(self):
  421. """Called as callback from I/O loop to kick-start the workflow, so it's
  422. safe to call user's completion callback from here if needed
  423. """
  424. _LOGGER.debug('_AsyncStreamConnector._start_async(); %s', self._sock)
  425. if self._state != self._STATE_ACTIVE:
  426. # Must have been canceled by user before we were called
  427. _LOGGER.debug(
  428. 'Abandoning streaming linkup due to inactive state '
  429. 'transition; state=%s; %s; .', self._state, self._sock)
  430. return
  431. # Link up protocol and transport if this is a plaintext linkup;
  432. # otherwise kick-off SSL workflow first
  433. if self._ssl_context is None:
  434. self._linkup()
  435. else:
  436. _LOGGER.debug('Starting SSL handshake on %s', self._sock)
  437. # Wrap our plain socket in ssl socket
  438. try:
  439. self._sock = self._ssl_context.wrap_socket(
  440. self._sock,
  441. server_side=False,
  442. do_handshake_on_connect=False,
  443. suppress_ragged_eofs=False, # False = error on incoming EOF
  444. server_hostname=self._server_hostname)
  445. except Exception as error: # pylint: disable=W0703
  446. _LOGGER.exception('SSL wrap_socket(%s) failed: %r', self._sock,
  447. error)
  448. self._report_completion(error)
  449. return
  450. self._do_ssl_handshake()
  451. @_log_exceptions
  452. def _linkup(self):
  453. """Connection is ready: instantiate and link up transport and protocol,
  454. and invoke user's completion callback.
  455. """
  456. _LOGGER.debug('_AsyncStreamConnector._linkup()')
  457. transport = None
  458. try:
  459. # Create the protocol
  460. try:
  461. protocol = self._protocol_factory()
  462. except Exception as error:
  463. _LOGGER.exception('protocol_factory() failed: error=%r; %s',
  464. error, self._sock)
  465. raise
  466. if self._ssl_context is None:
  467. # Create plaintext streaming transport
  468. try:
  469. transport = _AsyncPlaintextTransport(
  470. self._sock, protocol, self._nbio)
  471. except Exception as error:
  472. _LOGGER.exception('PlainTransport() failed: error=%r; %s',
  473. error, self._sock)
  474. raise
  475. else:
  476. # Create SSL streaming transport
  477. try:
  478. transport = _AsyncSSLTransport(self._sock, protocol,
  479. self._nbio)
  480. except Exception as error:
  481. _LOGGER.exception('SSLTransport() failed: error=%r; %s',
  482. error, self._sock)
  483. raise
  484. _LOGGER.debug('_linkup(): created transport %r', transport)
  485. # Acquaint protocol with its transport
  486. try:
  487. protocol.connection_made(transport)
  488. except Exception as error:
  489. _LOGGER.exception(
  490. 'protocol.connection_made(%r) failed: error=%r; %s',
  491. transport, error, self._sock)
  492. raise
  493. _LOGGER.debug('_linkup(): introduced transport to protocol %r; %r',
  494. transport, protocol)
  495. except Exception as error: # pylint: disable=W0703
  496. result = error
  497. else:
  498. result = (transport, protocol)
  499. self._report_completion(result)
  500. @_log_exceptions
  501. def _do_ssl_handshake(self):
  502. """Perform asynchronous SSL handshake on the already wrapped socket
  503. """
  504. _LOGGER.debug('_AsyncStreamConnector._do_ssl_handshake()')
  505. if self._state != self._STATE_ACTIVE:
  506. _LOGGER.debug(
  507. '_do_ssl_handshake: Abandoning streaming linkup due '
  508. 'to inactive state transition; state=%s; %s; .', self._state,
  509. self._sock)
  510. return
  511. done = False
  512. try:
  513. try:
  514. self._sock.do_handshake()
  515. except ssl.SSLError as error:
  516. if error.errno == ssl.SSL_ERROR_WANT_READ:
  517. _LOGGER.debug('SSL handshake wants read; %s.', self._sock)
  518. self._watching_socket = True
  519. self._nbio.set_reader(self._sock.fileno(),
  520. self._do_ssl_handshake)
  521. self._nbio.remove_writer(self._sock.fileno())
  522. elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
  523. _LOGGER.debug('SSL handshake wants write. %s', self._sock)
  524. self._watching_socket = True
  525. self._nbio.set_writer(self._sock.fileno(),
  526. self._do_ssl_handshake)
  527. self._nbio.remove_reader(self._sock.fileno())
  528. else:
  529. # Outer catch will report it
  530. raise
  531. else:
  532. done = True
  533. _LOGGER.info('SSL handshake completed successfully: %s',
  534. self._sock)
  535. except Exception as error: # pylint: disable=W0703
  536. _LOGGER.exception('SSL do_handshake failed: error=%r; %s', error,
  537. self._sock)
  538. self._report_completion(error)
  539. return
  540. if done:
  541. # Suspend I/O and link up transport with protocol
  542. _LOGGER.debug(
  543. '_do_ssl_handshake: removing watchers ahead of linkup: %s',
  544. self._sock)
  545. self._nbio.remove_reader(self._sock.fileno())
  546. self._nbio.remove_writer(self._sock.fileno())
  547. # So that our `_cleanup()` won't interfere with the transport's
  548. # socket watcher configuration.
  549. self._watching_socket = False
  550. _LOGGER.debug(
  551. '_do_ssl_handshake: pre-linkup removal of watchers is done; %s',
  552. self._sock)
  553. self._linkup()
  554. class _AsyncTransportBase( # pylint: disable=W0223
  555. AbstractStreamTransport):
  556. """Base class for `_AsyncPlaintextTransport` and `_AsyncSSLTransport`.
  557. """
  558. _STATE_ACTIVE = 1
  559. _STATE_FAILED = 2 # connection failed
  560. _STATE_ABORTED_BY_USER = 3 # cancel() called
  561. _STATE_COMPLETED = 4 # done with connection
  562. _MAX_RECV_BYTES = 4096 # per socket.recv() documentation recommendation
  563. # Max per consume call to prevent event starvation
  564. _MAX_CONSUME_BYTES = 1024 * 100
  565. class RxEndOfFile(OSError):
  566. """We raise this internally when EOF (empty read) is detected on input.
  567. """
  568. def __init__(self):
  569. super(_AsyncTransportBase.RxEndOfFile, self).__init__(
  570. -1, 'End of input stream (EOF)')
  571. def __init__(self, sock, protocol, nbio):
  572. """
  573. :param socket.socket | ssl.SSLSocket sock: connected socket
  574. :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
  575. corresponding protocol in this transport/protocol pairing; the
  576. protocol already had its `connection_made()` method called.
  577. :param AbstractIOServices | AbstractFileDescriptorServices nbio:
  578. """
  579. _LOGGER.debug('_AsyncTransportBase.__init__: %s', sock)
  580. self._sock = sock
  581. self._protocol = protocol
  582. self._nbio = nbio
  583. self._state = self._STATE_ACTIVE
  584. self._tx_buffers = collections.deque()
  585. self._tx_buffered_byte_count = 0
  586. def abort(self):
  587. """Close connection abruptly without waiting for pending I/O to
  588. complete. Will invoke the corresponding protocol's `connection_lost()`
  589. method asynchronously (not in context of the abort() call).
  590. :raises Exception: Exception-based exception on error
  591. """
  592. _LOGGER.info('Aborting transport connection: state=%s; %s', self._state,
  593. self._sock)
  594. self._initiate_abort(None)
  595. def get_protocol(self):
  596. """Return the protocol linked to this transport.
  597. :rtype: pika.adapters.utils.nbio_interface.AbstractStreamProtocol
  598. """
  599. return self._protocol
  600. def get_write_buffer_size(self):
  601. """
  602. :returns: Current size of output data buffered by the transport
  603. :rtype: int
  604. """
  605. return self._tx_buffered_byte_count
  606. def _buffer_tx_data(self, data):
  607. """Buffer the given data until it can be sent asynchronously.
  608. :param bytes data:
  609. :raises ValueError: if called with empty data
  610. """
  611. if not data:
  612. _LOGGER.error('write() called with empty data: state=%s; %s',
  613. self._state, self._sock)
  614. raise ValueError('write() called with empty data {!r}'.format(data))
  615. if self._state != self._STATE_ACTIVE:
  616. _LOGGER.debug(
  617. 'Ignoring write() called during inactive state: '
  618. 'state=%s; %s', self._state, self._sock)
  619. return
  620. self._tx_buffers.append(data)
  621. self._tx_buffered_byte_count += len(data)
  622. def _consume(self):
  623. """Utility method for use by subclasses to ingest data from socket and
  624. dispatch it to protocol's `data_received()` method socket-specific
  625. "try again" exception, per-event data consumption limit is reached,
  626. transport becomes inactive, or a fatal failure.
  627. Consumes up to `self._MAX_CONSUME_BYTES` to prevent event starvation or
  628. until state becomes inactive (e.g., `protocol.data_received()` callback
  629. aborts the transport)
  630. :raises: Whatever the corresponding `sock.recv()` raises except the
  631. socket error with errno.EINTR
  632. :raises: Whatever the `protocol.data_received()` callback raises
  633. :raises _AsyncTransportBase.RxEndOfFile: upon shutdown of input stream
  634. """
  635. bytes_consumed = 0
  636. while (self._state == self._STATE_ACTIVE and
  637. bytes_consumed < self._MAX_CONSUME_BYTES):
  638. data = self._sigint_safe_recv(self._sock, self._MAX_RECV_BYTES)
  639. bytes_consumed += len(data)
  640. # Empty data, should disconnect
  641. if not data:
  642. _LOGGER.error('Socket EOF; %s', self._sock)
  643. raise self.RxEndOfFile()
  644. # Pass the data to the protocol
  645. try:
  646. self._protocol.data_received(data)
  647. except Exception as error:
  648. _LOGGER.exception(
  649. 'protocol.data_received() failed: error=%r; %s', error,
  650. self._sock)
  651. raise
  652. def _produce(self):
  653. """Utility method for use by subclasses to emit data from tx_buffers.
  654. This method sends chunks from `tx_buffers` until all chunks are
  655. exhausted or sending is interrupted by an exception. Maintains integrity
  656. of `self.tx_buffers`.
  657. :raises: whatever the corresponding `sock.send()` raises except the
  658. socket error with errno.EINTR
  659. """
  660. while self._tx_buffers:
  661. num_bytes_sent = self._sigint_safe_send(self._sock,
  662. self._tx_buffers[0])
  663. chunk = self._tx_buffers.popleft()
  664. if num_bytes_sent < len(chunk):
  665. _LOGGER.debug('Partial send, requeing remaining data; %s of %s',
  666. num_bytes_sent, len(chunk))
  667. self._tx_buffers.appendleft(chunk[num_bytes_sent:])
  668. self._tx_buffered_byte_count -= num_bytes_sent
  669. assert self._tx_buffered_byte_count >= 0, (
  670. '_AsyncTransportBase._produce() tx buffer size underflow',
  671. self._tx_buffered_byte_count, self._state)
  672. @staticmethod
  673. @_retry_on_sigint
  674. def _sigint_safe_recv(sock, max_bytes):
  675. """Receive data from socket, retrying on SIGINT.
  676. :param sock: stream or SSL socket
  677. :param max_bytes: maximum number of bytes to receive
  678. :returns: received data or empty bytes uppon end of file
  679. :rtype: bytes
  680. :raises: whatever the corresponding `sock.recv()` raises except socket
  681. error with errno.EINTR
  682. """
  683. return sock.recv(max_bytes)
  684. @staticmethod
  685. @_retry_on_sigint
  686. def _sigint_safe_send(sock, data):
  687. """Send data to socket, retrying on SIGINT.
  688. :param sock: stream or SSL socket
  689. :param data: data bytes to send
  690. :returns: number of bytes actually sent
  691. :rtype: int
  692. :raises: whatever the corresponding `sock.send()` raises except socket
  693. error with errno.EINTR
  694. """
  695. return sock.send(data)
  696. @_log_exceptions
  697. def _deactivate(self):
  698. """Unregister the transport from I/O events
  699. """
  700. if self._state == self._STATE_ACTIVE:
  701. _LOGGER.info('Deactivating transport: state=%s; %s', self._state,
  702. self._sock)
  703. self._nbio.remove_reader(self._sock.fileno())
  704. self._nbio.remove_writer(self._sock.fileno())
  705. self._tx_buffers.clear()
  706. @_log_exceptions
  707. def _close_and_finalize(self):
  708. """Close the transport's socket and unlink the transport it from
  709. references to other assets (protocol, etc.)
  710. """
  711. if self._state != self._STATE_COMPLETED:
  712. _LOGGER.info('Closing transport socket and unlinking: state=%s; %s',
  713. self._state, self._sock)
  714. try:
  715. self._sock.shutdown(socket.SHUT_RDWR)
  716. except pika.compat.SOCKET_ERROR:
  717. pass
  718. self._sock.close()
  719. self._sock = None
  720. self._protocol = None
  721. self._nbio = None
  722. self._state = self._STATE_COMPLETED
  723. @_log_exceptions
  724. def _initiate_abort(self, error):
  725. """Initiate asynchronous abort of the transport that concludes with a
  726. call to the protocol's `connection_lost()` method. No flushing of
  727. output buffers will take place.
  728. :param BaseException | None error: None if being canceled by user,
  729. including via falsie return value from protocol.eof_received;
  730. otherwise the exception corresponding to the the failed connection.
  731. """
  732. _LOGGER.info(
  733. '_AsyncTransportBase._initate_abort(): Initiating abrupt '
  734. 'asynchronous transport shutdown: state=%s; error=%r; %s',
  735. self._state, error, self._sock)
  736. assert self._state != self._STATE_COMPLETED, (
  737. '_AsyncTransportBase._initate_abort() expected '
  738. 'non-_STATE_COMPLETED', self._state)
  739. if self._state == self._STATE_COMPLETED:
  740. return
  741. self._deactivate()
  742. # Update state
  743. if error is None:
  744. # Being aborted by user
  745. if self._state == self._STATE_ABORTED_BY_USER:
  746. # Abort by user already pending
  747. _LOGGER.debug('_AsyncTransportBase._initiate_abort(): '
  748. 'ignoring - user-abort already pending.')
  749. return
  750. # Notification priority is given to user-initiated abort over
  751. # failed connection
  752. self._state = self._STATE_ABORTED_BY_USER
  753. else:
  754. # Connection failed
  755. if self._state != self._STATE_ACTIVE:
  756. assert self._state == self._STATE_ABORTED_BY_USER, (
  757. '_AsyncTransportBase._initate_abort() expected '
  758. '_STATE_ABORTED_BY_USER', self._state)
  759. return
  760. self._state = self._STATE_FAILED
  761. # Schedule callback from I/O loop to avoid potential reentry into user
  762. # code
  763. self._nbio.add_callback_threadsafe(
  764. functools.partial(self._connection_lost_notify_async, error))
  765. @_log_exceptions
  766. def _connection_lost_notify_async(self, error):
  767. """Handle aborting of transport either due to socket error or user-
  768. initiated `abort()` call. Must be called from an I/O loop callback owned
  769. by us in order to avoid reentry into user code from user's API call into
  770. the transport.
  771. :param BaseException | None error: None if being canceled by user;
  772. otherwise the exception corresponding to the the failed connection.
  773. """
  774. _LOGGER.debug('Concluding transport shutdown: state=%s; error=%r',
  775. self._state, error)
  776. if self._state == self._STATE_COMPLETED:
  777. return
  778. if error is not None and self._state != self._STATE_FAILED:
  779. # Priority is given to user-initiated abort notification
  780. assert self._state == self._STATE_ABORTED_BY_USER, (
  781. '_AsyncTransportBase._connection_lost_notify_async() '
  782. 'expected _STATE_ABORTED_BY_USER', self._state)
  783. return
  784. # Inform protocol
  785. try:
  786. self._protocol.connection_lost(error)
  787. except Exception as exc: # pylint: disable=W0703
  788. _LOGGER.exception('protocol.connection_lost(%r) failed: exc=%r; %s',
  789. error, exc, self._sock)
  790. # Re-raise, since we've exhausted our normal failure notification
  791. # mechanism (i.e., connection_lost())
  792. raise
  793. finally:
  794. self._close_and_finalize()
  795. class _AsyncPlaintextTransport(_AsyncTransportBase):
  796. """Implementation of `nbio_interface.AbstractStreamTransport` for a
  797. plaintext connection.
  798. """
  799. def __init__(self, sock, protocol, nbio):
  800. """
  801. :param socket.socket sock: non-blocking connected socket
  802. :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
  803. corresponding protocol in this transport/protocol pairing; the
  804. protocol already had its `connection_made()` method called.
  805. :param AbstractIOServices | AbstractFileDescriptorServices nbio:
  806. """
  807. super(_AsyncPlaintextTransport, self).__init__(sock, protocol, nbio)
  808. # Request to be notified of incoming data; we'll watch for writability
  809. # only when our write buffer is non-empty
  810. self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
  811. def write(self, data):
  812. """Buffer the given data until it can be sent asynchronously.
  813. :param bytes data:
  814. :raises ValueError: if called with empty data
  815. """
  816. if self._state != self._STATE_ACTIVE:
  817. _LOGGER.debug(
  818. 'Ignoring write() called during inactive state: '
  819. 'state=%s; %s', self._state, self._sock)
  820. return
  821. assert data, ('_AsyncPlaintextTransport.write(): empty data from user.',
  822. data, self._state)
  823. if not self.get_write_buffer_size():
  824. self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
  825. _LOGGER.debug('Turned on writability watcher: %s', self._sock)
  826. self._buffer_tx_data(data)
  827. @_log_exceptions
  828. def _on_socket_readable(self):
  829. """Ingest data from socket and dispatch it to protocol until exception
  830. occurs (typically EAGAIN or EWOULDBLOCK), per-event data consumption
  831. limit is reached, transport becomes inactive, or failure.
  832. """
  833. if self._state != self._STATE_ACTIVE:
  834. _LOGGER.debug(
  835. 'Ignoring readability notification due to inactive '
  836. 'state: state=%s; %s', self._state, self._sock)
  837. return
  838. try:
  839. self._consume()
  840. except self.RxEndOfFile:
  841. try:
  842. keep_open = self._protocol.eof_received()
  843. except Exception as error: # pylint: disable=W0703
  844. _LOGGER.exception(
  845. 'protocol.eof_received() failed: error=%r; %s', error,
  846. self._sock)
  847. self._initiate_abort(error)
  848. else:
  849. if keep_open:
  850. _LOGGER.info(
  851. 'protocol.eof_received() elected to keep open: %s',
  852. self._sock)
  853. self._nbio.remove_reader(self._sock.fileno())
  854. else:
  855. _LOGGER.info('protocol.eof_received() elected to close: %s',
  856. self._sock)
  857. self._initiate_abort(None)
  858. except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
  859. if (isinstance(error, pika.compat.SOCKET_ERROR) and
  860. error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
  861. _LOGGER.debug('Recv would block on %s', self._sock)
  862. else:
  863. _LOGGER.exception(
  864. '_AsyncBaseTransport._consume() failed, aborting '
  865. 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
  866. error, self._sock, ''.join(
  867. traceback.format_exception(*sys.exc_info())))
  868. self._initiate_abort(error)
  869. else:
  870. if self._state != self._STATE_ACTIVE:
  871. # Most likely our protocol's `data_received()` aborted the
  872. # transport
  873. _LOGGER.debug(
  874. 'Leaving Plaintext consumer due to inactive '
  875. 'state: state=%s; %s', self._state, self._sock)
  876. @_log_exceptions
  877. def _on_socket_writable(self):
  878. """Handle writable socket notification
  879. """
  880. if self._state != self._STATE_ACTIVE:
  881. _LOGGER.debug(
  882. 'Ignoring writability notification due to inactive '
  883. 'state: state=%s; %s', self._state, self._sock)
  884. return
  885. # We shouldn't be getting called with empty tx buffers
  886. assert self._tx_buffers, (
  887. '_AsyncPlaintextTransport._on_socket_writable() called, '
  888. 'but _tx_buffers is empty.', self._state)
  889. try:
  890. # Transmit buffered data to remote socket
  891. self._produce()
  892. except (Exception, pika.compat.SOCKET_ERROR) as error: # pylint: disable=W0703
  893. if (isinstance(error, pika.compat.SOCKET_ERROR) and
  894. error.errno in _TRY_IO_AGAIN_SOCK_ERROR_CODES):
  895. _LOGGER.debug('Send would block on %s', self._sock)
  896. else:
  897. _LOGGER.exception(
  898. '_AsyncBaseTransport._produce() failed, aborting '
  899. 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
  900. error, self._sock, ''.join(
  901. traceback.format_exception(*sys.exc_info())))
  902. self._initiate_abort(error)
  903. else:
  904. if not self._tx_buffers:
  905. self._nbio.remove_writer(self._sock.fileno())
  906. _LOGGER.debug('Turned off writability watcher: %s', self._sock)
  907. class _AsyncSSLTransport(_AsyncTransportBase):
  908. """Implementation of `.nbio_interface.AbstractStreamTransport` for an SSL
  909. connection.
  910. """
  911. def __init__(self, sock, protocol, nbio):
  912. """
  913. :param ssl.SSLSocket sock: non-blocking connected socket
  914. :param pika.adapters.utils.nbio_interface.AbstractStreamProtocol protocol:
  915. corresponding protocol in this transport/protocol pairing; the
  916. protocol already had its `connection_made()` method called.
  917. :param AbstractIOServices | AbstractFileDescriptorServices nbio:
  918. """
  919. super(_AsyncSSLTransport, self).__init__(sock, protocol, nbio)
  920. self._ssl_readable_action = self._consume
  921. self._ssl_writable_action = None
  922. # Bootstrap consumer; we'll take care of producer once data is buffered
  923. self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
  924. # Try reading asap just in case read-ahead caused some
  925. self._nbio.add_callback_threadsafe(self._on_socket_readable)
  926. def write(self, data):
  927. """Buffer the given data until it can be sent asynchronously.
  928. :param bytes data:
  929. :raises ValueError: if called with empty data
  930. """
  931. if self._state != self._STATE_ACTIVE:
  932. _LOGGER.debug(
  933. 'Ignoring write() called during inactive state: '
  934. 'state=%s; %s', self._state, self._sock)
  935. return
  936. tx_buffer_was_empty = self.get_write_buffer_size() == 0
  937. self._buffer_tx_data(data)
  938. if tx_buffer_was_empty and self._ssl_writable_action is None:
  939. self._ssl_writable_action = self._produce
  940. self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
  941. _LOGGER.debug('Turned on writability watcher: %s', self._sock)
  942. @_log_exceptions
  943. def _on_socket_readable(self):
  944. """Handle readable socket indication
  945. """
  946. if self._state != self._STATE_ACTIVE:
  947. _LOGGER.debug(
  948. 'Ignoring readability notification due to inactive '
  949. 'state: state=%s; %s', self._state, self._sock)
  950. return
  951. if self._ssl_readable_action:
  952. try:
  953. self._ssl_readable_action()
  954. except Exception as error: # pylint: disable=W0703
  955. self._initiate_abort(error)
  956. else:
  957. _LOGGER.debug(
  958. 'SSL readable action was suppressed: '
  959. 'ssl_writable_action=%r; %s', self._ssl_writable_action,
  960. self._sock)
  961. @_log_exceptions
  962. def _on_socket_writable(self):
  963. """Handle writable socket notification
  964. """
  965. if self._state != self._STATE_ACTIVE:
  966. _LOGGER.debug(
  967. 'Ignoring writability notification due to inactive '
  968. 'state: state=%s; %s', self._state, self._sock)
  969. return
  970. if self._ssl_writable_action:
  971. try:
  972. self._ssl_writable_action()
  973. except Exception as error: # pylint: disable=W0703
  974. self._initiate_abort(error)
  975. else:
  976. _LOGGER.debug(
  977. 'SSL writable action was suppressed: '
  978. 'ssl_readable_action=%r; %s', self._ssl_readable_action,
  979. self._sock)
  980. @_log_exceptions
  981. def _consume(self):
  982. """[override] Ingest data from socket and dispatch it to protocol until
  983. exception occurs (typically ssl.SSLError with
  984. SSL_ERROR_WANT_READ/WRITE), per-event data consumption limit is reached,
  985. transport becomes inactive, or failure.
  986. Update consumer/producer registration.
  987. :raises Exception: error that signals that connection needs to be
  988. aborted
  989. """
  990. next_consume_on_readable = True
  991. try:
  992. super(_AsyncSSLTransport, self)._consume()
  993. except ssl.SSLError as error:
  994. if error.errno == ssl.SSL_ERROR_WANT_READ:
  995. _LOGGER.debug('SSL ingester wants read: %s', self._sock)
  996. elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
  997. # Looks like SSL re-negotiation
  998. _LOGGER.debug('SSL ingester wants write: %s', self._sock)
  999. next_consume_on_readable = False
  1000. else:
  1001. _LOGGER.exception(
  1002. '_AsyncBaseTransport._consume() failed, aborting '
  1003. 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
  1004. error, self._sock, ''.join(
  1005. traceback.format_exception(*sys.exc_info())))
  1006. raise # let outer catch block abort the transport
  1007. else:
  1008. if self._state != self._STATE_ACTIVE:
  1009. # Most likely our protocol's `data_received()` aborted the
  1010. # transport
  1011. _LOGGER.debug(
  1012. 'Leaving SSL consumer due to inactive '
  1013. 'state: state=%s; %s', self._state, self._sock)
  1014. return
  1015. # Consumer exited without exception; there may still be more,
  1016. # possibly unprocessed, data records in SSL input buffers that
  1017. # can be read without waiting for socket to become readable.
  1018. # In case buffered input SSL data records still remain
  1019. self._nbio.add_callback_threadsafe(self._on_socket_readable)
  1020. # Update consumer registration
  1021. if next_consume_on_readable:
  1022. if not self._ssl_readable_action:
  1023. self._nbio.set_reader(self._sock.fileno(),
  1024. self._on_socket_readable)
  1025. self._ssl_readable_action = self._consume
  1026. # NOTE: can't use identity check, it fails for instance methods
  1027. if self._ssl_writable_action == self._consume: # pylint: disable=W0143
  1028. self._nbio.remove_writer(self._sock.fileno())
  1029. self._ssl_writable_action = None
  1030. else:
  1031. # WANT_WRITE
  1032. if not self._ssl_writable_action:
  1033. self._nbio.set_writer(self._sock.fileno(),
  1034. self._on_socket_writable)
  1035. self._ssl_writable_action = self._consume
  1036. if self._ssl_readable_action:
  1037. self._nbio.remove_reader(self._sock.fileno())
  1038. self._ssl_readable_action = None
  1039. # Update producer registration
  1040. if self._tx_buffers and not self._ssl_writable_action:
  1041. self._ssl_writable_action = self._produce
  1042. self._nbio.set_writer(self._sock.fileno(), self._on_socket_writable)
  1043. @_log_exceptions
  1044. def _produce(self):
  1045. """[override] Emit data from tx_buffers all chunks are exhausted or
  1046. sending is interrupted by an exception (typically ssl.SSLError with
  1047. SSL_ERROR_WANT_READ/WRITE).
  1048. Update consumer/producer registration.
  1049. :raises Exception: error that signals that connection needs to be
  1050. aborted
  1051. """
  1052. next_produce_on_writable = None # None means no need to produce
  1053. try:
  1054. super(_AsyncSSLTransport, self)._produce()
  1055. except ssl.SSLError as error:
  1056. if error.errno == ssl.SSL_ERROR_WANT_READ:
  1057. # Looks like SSL re-negotiation
  1058. _LOGGER.debug('SSL emitter wants read: %s', self._sock)
  1059. next_produce_on_writable = False
  1060. elif error.errno == ssl.SSL_ERROR_WANT_WRITE:
  1061. _LOGGER.debug('SSL emitter wants write: %s', self._sock)
  1062. next_produce_on_writable = True
  1063. else:
  1064. _LOGGER.exception(
  1065. '_AsyncBaseTransport._produce() failed, aborting '
  1066. 'connection: error=%r; sock=%s; Caller\'s stack:\n%s',
  1067. error, self._sock, ''.join(
  1068. traceback.format_exception(*sys.exc_info())))
  1069. raise # let outer catch block abort the transport
  1070. else:
  1071. # No exception, so everything must have been written to the socket
  1072. assert not self._tx_buffers, (
  1073. '_AsyncSSLTransport._produce(): no exception from parent '
  1074. 'class, but data remains in _tx_buffers.', len(
  1075. self._tx_buffers))
  1076. # Update producer registration
  1077. if self._tx_buffers:
  1078. assert next_produce_on_writable is not None, (
  1079. '_AsyncSSLTransport._produce(): next_produce_on_writable is '
  1080. 'still None', self._state)
  1081. if next_produce_on_writable:
  1082. if not self._ssl_writable_action:
  1083. self._nbio.set_writer(self._sock.fileno(),
  1084. self._on_socket_writable)
  1085. self._ssl_writable_action = self._produce
  1086. # NOTE: can't use identity check, it fails for instance methods
  1087. if self._ssl_readable_action == self._produce: # pylint: disable=W0143
  1088. self._nbio.remove_reader(self._sock.fileno())
  1089. self._ssl_readable_action = None
  1090. else:
  1091. # WANT_READ
  1092. if not self._ssl_readable_action:
  1093. self._nbio.set_reader(self._sock.fileno(),
  1094. self._on_socket_readable)
  1095. self._ssl_readable_action = self._produce
  1096. if self._ssl_writable_action:
  1097. self._nbio.remove_writer(self._sock.fileno())
  1098. self._ssl_writable_action = None
  1099. else:
  1100. # NOTE: can't use identity check, it fails for instance methods
  1101. if self._ssl_readable_action == self._produce: # pylint: disable=W0143
  1102. self._nbio.remove_reader(self._sock.fileno())
  1103. self._ssl_readable_action = None
  1104. assert self._ssl_writable_action != self._produce, ( # pylint: disable=W0143
  1105. '_AsyncSSLTransport._produce(): with empty tx_buffers, '
  1106. 'writable_action cannot be _produce when readable is '
  1107. '_produce', self._state)
  1108. else:
  1109. # NOTE: can't use identity check, it fails for instance methods
  1110. assert self._ssl_writable_action == self._produce, ( # pylint: disable=W0143
  1111. '_AsyncSSLTransport._produce(): with empty tx_buffers, '
  1112. 'expected writable_action as _produce when readable_action '
  1113. 'is not _produce', 'writable_action:',
  1114. self._ssl_writable_action, 'readable_action:',
  1115. self._ssl_readable_action, 'state:', self._state)
  1116. self._ssl_writable_action = None
  1117. self._nbio.remove_writer(self._sock.fileno())
  1118. # Update consumer registration
  1119. if not self._ssl_readable_action:
  1120. self._ssl_readable_action = self._consume
  1121. self._nbio.set_reader(self._sock.fileno(), self._on_socket_readable)
  1122. # In case input SSL data records have been buffered
  1123. self._nbio.add_callback_threadsafe(self._on_socket_readable)
  1124. elif self._sock.pending():
  1125. self._nbio.add_callback_threadsafe(self._on_socket_readable)