blocking_connection.py 107 KB


  1. """The blocking connection adapter module implements blocking semantics on top
  2. of Pika's core AMQP driver. While most of the asynchronous expectations are
  3. removed when using the blocking connection adapter, it attempts to remain true
  4. to the asynchronous RPC nature of the AMQP protocol, supporting server sent
  5. RPC commands.
  6. The user facing classes in the module consist of the
  7. :py:class:`~pika.adapters.blocking_connection.BlockingConnection`
  8. and the :class:`~pika.adapters.blocking_connection.BlockingChannel`
  9. classes.
  10. """
  11. # Suppress too-many-lines
  12. # pylint: disable=C0302
  13. # Disable "access to protected member warnings: this wrapper implementation is
  14. # a friend of those instances
  15. # pylint: disable=W0212
  16. from collections import namedtuple, deque
  17. import contextlib
  18. import functools
  19. import logging
  20. import threading
  21. import time
  22. import pika.compat as compat
  23. import pika.exceptions as exceptions
  24. import pika.spec
  25. import pika.validators as validators
  26. from pika.adapters.utils import connection_workflow
  27. # NOTE: import SelectConnection after others to avoid circular depenency
  28. from pika.adapters import select_connection
  29. LOGGER = logging.getLogger(__name__)
  30. class _CallbackResult(object):
  31. """ CallbackResult is a non-thread-safe implementation for receiving
  32. callback results; INTERNAL USE ONLY!
  33. """
  34. __slots__ = ('_value_class', '_ready', '_values')
  35. def __init__(self, value_class=None):
  36. """
  37. :param callable value_class: only needed if the CallbackResult
  38. instance will be used with
  39. `set_value_once` and `append_element`.
  40. *args and **kwargs of the value setter
  41. methods will be passed to this class.
  42. """
  43. self._value_class = value_class
  44. self._ready = None
  45. self._values = None
  46. self.reset()
  47. def reset(self):
  48. """Reset value, but not _value_class"""
  49. self._ready = False
  50. self._values = None
  51. def __bool__(self):
  52. """ Called by python runtime to implement truth value testing and the
  53. built-in operation bool(); NOTE: python 3.x
  54. """
  55. return self.is_ready()
  56. # python 2.x version of __bool__
  57. __nonzero__ = __bool__
  58. def __enter__(self):
  59. """ Entry into context manager that automatically resets the object
  60. on exit; this usage pattern helps garbage-collection by eliminating
  61. potential circular references.
  62. """
  63. return self
  64. def __exit__(self, *args, **kwargs):
  65. """Reset value"""
  66. self.reset()
  67. def is_ready(self):
  68. """
  69. :returns: True if the object is in a signaled state
  70. :rtype: bool
  71. """
  72. return self._ready
  73. @property
  74. def ready(self):
  75. """True if the object is in a signaled state"""
  76. return self._ready
  77. def signal_once(self, *_args, **_kwargs):
  78. """ Set as ready
  79. :raises AssertionError: if result was already signalled
  80. """
  81. assert not self._ready, '_CallbackResult was already set'
  82. self._ready = True
  83. def set_value_once(self, *args, **kwargs):
  84. """ Set as ready with value; the value may be retrieved via the `value`
  85. property getter
  86. :raises AssertionError: if result was already set
  87. """
  88. self.signal_once()
  89. try:
  90. self._values = (self._value_class(*args, **kwargs),)
  91. except Exception:
  92. LOGGER.error(
  93. "set_value_once failed: value_class=%r; args=%r; kwargs=%r",
  94. self._value_class, args, kwargs)
  95. raise
  96. def append_element(self, *args, **kwargs):
  97. """Append an element to values"""
  98. assert not self._ready or isinstance(self._values, list), (
  99. '_CallbackResult state is incompatible with append_element: '
  100. 'ready=%r; values=%r' % (self._ready, self._values))
  101. try:
  102. value = self._value_class(*args, **kwargs)
  103. except Exception:
  104. LOGGER.error(
  105. "append_element failed: value_class=%r; args=%r; kwargs=%r",
  106. self._value_class, args, kwargs)
  107. raise
  108. if self._values is None:
  109. self._values = [value]
  110. else:
  111. self._values.append(value)
  112. self._ready = True
  113. @property
  114. def value(self):
  115. """
  116. :returns: a reference to the value that was set via `set_value_once`
  117. :rtype: object
  118. :raises AssertionError: if result was not set or value is incompatible
  119. with `set_value_once`
  120. """
  121. assert self._ready, '_CallbackResult was not set'
  122. assert isinstance(self._values, tuple) and len(self._values) == 1, (
  123. '_CallbackResult value is incompatible with set_value_once: %r' %
  124. (self._values,))
  125. return self._values[0]
  126. @property
  127. def elements(self):
  128. """
  129. :returns: a reference to the list containing one or more elements that
  130. were added via `append_element`
  131. :rtype: list
  132. :raises AssertionError: if result was not set or value is incompatible
  133. with `append_element`
  134. """
  135. assert self._ready, '_CallbackResult was not set'
  136. assert isinstance(self._values, list) and self._values, (
  137. '_CallbackResult value is incompatible with append_element: %r' %
  138. (self._values,))
  139. return self._values
  140. class _IoloopTimerContext(object):
  141. """Context manager for registering and safely unregistering a
  142. SelectConnection ioloop-based timer
  143. """
  144. def __init__(self, duration, connection):
  145. """
  146. :param float duration: non-negative timer duration in seconds
  147. :param select_connection.SelectConnection connection:
  148. """
  149. assert hasattr(connection, '_adapter_call_later'), connection
  150. self._duration = duration
  151. self._connection = connection
  152. self._callback_result = _CallbackResult()
  153. self._timer_handle = None
  154. def __enter__(self):
  155. """Register a timer"""
  156. self._timer_handle = self._connection._adapter_call_later(
  157. self._duration, self._callback_result.signal_once)
  158. return self
  159. def __exit__(self, *_args, **_kwargs):
  160. """Unregister timer if it hasn't fired yet"""
  161. if not self._callback_result:
  162. self._connection._adapter_remove_timeout(self._timer_handle)
  163. self._timer_handle = None
  164. def is_ready(self):
  165. """
  166. :returns: True if timer has fired, False otherwise
  167. :rtype: bool
  168. """
  169. return self._callback_result.is_ready()
  170. class _TimerEvt(object):
  171. """Represents a timer created via `BlockingConnection.call_later`"""
  172. __slots__ = ('timer_id', '_callback')
  173. def __init__(self, callback):
  174. """
  175. :param callback: see callback in `BlockingConnection.call_later`
  176. """
  177. self._callback = callback
  178. # Will be set to timer id returned from the underlying implementation's
  179. # `_adapter_call_later` method
  180. self.timer_id = None
  181. def __repr__(self):
  182. return '<%s timer_id=%s callback=%s>' % (self.__class__.__name__,
  183. self.timer_id, self._callback)
  184. def dispatch(self):
  185. """Dispatch the user's callback method"""
  186. LOGGER.debug('_TimerEvt.dispatch: invoking callback=%r', self._callback)
  187. self._callback()
  188. class _ConnectionBlockedUnblockedEvtBase(object):
  189. """Base class for `_ConnectionBlockedEvt` and `_ConnectionUnblockedEvt`"""
  190. __slots__ = ('_callback', '_method_frame')
  191. def __init__(self, callback, method_frame):
  192. """
  193. :param callback: see callback parameter in
  194. `BlockingConnection.add_on_connection_blocked_callback` and
  195. `BlockingConnection.add_on_connection_unblocked_callback`
  196. :param pika.frame.Method method_frame: with method_frame.method of type
  197. `pika.spec.Connection.Blocked` or `pika.spec.Connection.Unblocked`
  198. """
  199. self._callback = callback
  200. self._method_frame = method_frame
  201. def __repr__(self):
  202. return '<%s callback=%s, frame=%s>' % (
  203. self.__class__.__name__, self._callback, self._method_frame)
  204. def dispatch(self):
  205. """Dispatch the user's callback method"""
  206. self._callback(self._method_frame)
  207. class _ConnectionBlockedEvt(_ConnectionBlockedUnblockedEvtBase):
  208. """Represents a Connection.Blocked notification from RabbitMQ broker`"""
  209. class _ConnectionUnblockedEvt(_ConnectionBlockedUnblockedEvtBase):
  210. """Represents a Connection.Unblocked notification from RabbitMQ broker`"""
  211. class BlockingConnection(object):
  212. """The BlockingConnection creates a layer on top of Pika's asynchronous core
  213. providing methods that will block until their expected response has
  214. returned. Due to the asynchronous nature of the `Basic.Deliver` and
  215. `Basic.Return` calls from RabbitMQ to your application, you can still
  216. implement continuation-passing style asynchronous methods if you'd like to
  217. receive messages from RabbitMQ using
  218. :meth:`basic_consume <BlockingChannel.basic_consume>` or if you want to be
  219. notified of a delivery failure when using
  220. :meth:`basic_publish <BlockingChannel.basic_publish>`.
  221. For more information about communicating with the blocking_connection
  222. adapter, be sure to check out the
  223. :class:`BlockingChannel <BlockingChannel>` class which implements the
  224. :class:`Channel <pika.channel.Channel>` based communication for the
  225. blocking_connection adapter.
  226. To prevent recursion/reentrancy, the blocking connection and channel
  227. implementations queue asynchronously-delivered events received
  228. in nested context (e.g., while waiting for `BlockingConnection.channel` or
  229. `BlockingChannel.queue_declare` to complete), dispatching them synchronously
  230. once nesting returns to the desired context. This concerns all callbacks,
  231. such as those registered via `BlockingConnection.call_later`,
  232. `BlockingConnection.add_on_connection_blocked_callback`,
  233. `BlockingConnection.add_on_connection_unblocked_callback`,
  234. `BlockingChannel.basic_consume`, etc.
  235. Blocked Connection deadlock avoidance: when RabbitMQ becomes low on
  236. resources, it emits Connection.Blocked (AMQP extension) to the client
  237. connection when client makes a resource-consuming request on that connection
  238. or its channel (e.g., `Basic.Publish`); subsequently, RabbitMQ suspsends
  239. processing requests from that connection until the affected resources are
  240. restored. See http://www.rabbitmq.com/connection-blocked.html. This
  241. may impact `BlockingConnection` and `BlockingChannel` operations in a
  242. way that users might not be expecting. For example, if the user dispatches
  243. `BlockingChannel.basic_publish` in non-publisher-confirmation mode while
  244. RabbitMQ is in this low-resource state followed by a synchronous request
  245. (e.g., `BlockingConnection.channel`, `BlockingChannel.consume`,
  246. `BlockingChannel.basic_consume`, etc.), the synchronous request will block
  247. indefinitely (until Connection.Unblocked) waiting for RabbitMQ to reply. If
  248. the blocked state persists for a long time, the blocking operation will
  249. appear to hang. In this state, `BlockingConnection` instance and its
  250. channels will not dispatch user callbacks. SOLUTION: To break this potential
  251. deadlock, applications may configure the `blocked_connection_timeout`
  252. connection parameter when instantiating `BlockingConnection`. Upon blocked
  253. connection timeout, this adapter will raise ConnectionBlockedTimeout
  254. exception`. See `pika.connection.ConnectionParameters` documentation to
  255. learn more about the `blocked_connection_timeout` configuration.
  256. """
  257. # Connection-closing callback args
  258. _OnClosedArgs = namedtuple('BlockingConnection__OnClosedArgs',
  259. 'connection error')
  260. # Channel-opened callback args
  261. _OnChannelOpenedArgs = namedtuple('BlockingConnection__OnChannelOpenedArgs',
  262. 'channel')
  263. def __init__(self, parameters=None, _impl_class=None):
  264. """Create a new instance of the Connection object.
  265. :param None | pika.connection.Parameters | sequence parameters:
  266. Connection parameters instance or non-empty sequence of them. If
  267. None, a `pika.connection.Parameters` instance will be created with
  268. default settings. See `pika.AMQPConnectionWorkflow` for more
  269. details about multiple parameter configurations and retries.
  270. :param _impl_class: for tests/debugging only; implementation class;
  271. None=default
  272. :raises RuntimeError:
  273. """
  274. # Used for mutual exclusion to avoid race condition between
  275. # BlockingConnection._cleanup() and another thread calling
  276. # BlockingConnection.add_callback_threadsafe() against a closed
  277. # ioloop.
  278. self._cleanup_mutex = threading.Lock()
  279. # Used by the _acquire_event_dispatch decorator; when already greater
  280. # than 0, event dispatch is already acquired higher up the call stack
  281. self._event_dispatch_suspend_depth = 0
  282. # Connection-specific events that are ready for dispatch: _TimerEvt,
  283. # _ConnectionBlockedEvt, _ConnectionUnblockedEvt
  284. self._ready_events = deque()
  285. # Channel numbers of channels that are requesting a call to their
  286. # BlockingChannel._dispatch_events method; See
  287. # `_request_channel_dispatch`
  288. self._channels_pending_dispatch = set()
  289. # Receives on_close_callback args from Connection
  290. self._closed_result = _CallbackResult(self._OnClosedArgs)
  291. # Perform connection workflow
  292. self._impl = None # so that attribute is created in case below raises
  293. self._impl = self._create_connection(parameters, _impl_class)
  294. self._impl.add_on_close_callback(self._closed_result.set_value_once)
  295. def __repr__(self):
  296. return '<%s impl=%r>' % (self.__class__.__name__, self._impl)
  297. def __enter__(self):
  298. # Prepare `with` context
  299. return self
  300. def __exit__(self, exc_type, value, traceback):
  301. # Close connection after `with` context
  302. if self.is_open:
  303. self.close()
  304. def _cleanup(self):
  305. """Clean up members that might inhibit garbage collection
  306. """
  307. with self._cleanup_mutex:
  308. if self._impl is not None:
  309. self._impl.ioloop.close()
  310. self._ready_events.clear()
  311. self._closed_result.reset()
  312. @contextlib.contextmanager
  313. def _acquire_event_dispatch(self):
  314. """ Context manager that controls access to event dispatcher for
  315. preventing reentrancy.
  316. The "as" value is True if the managed code block owns the event
  317. dispatcher and False if caller higher up in the call stack already owns
  318. it. Only managed code that gets ownership (got True) is permitted to
  319. dispatch
  320. """
  321. try:
  322. # __enter__ part
  323. self._event_dispatch_suspend_depth += 1
  324. yield self._event_dispatch_suspend_depth == 1
  325. finally:
  326. # __exit__ part
  327. self._event_dispatch_suspend_depth -= 1
  328. def _create_connection(self, configs, impl_class):
  329. """Run connection workflow, blocking until it completes.
  330. :param None | pika.connection.Parameters | sequence configs: Connection
  331. parameters instance or non-empty sequence of them.
  332. :param None | SelectConnection impl_class: for tests/debugging only;
  333. implementation class;
  334. :rtype: impl_class
  335. :raises: exception on failure
  336. """
  337. if configs is None:
  338. configs = (pika.connection.Parameters(),)
  339. if isinstance(configs, pika.connection.Parameters):
  340. configs = (configs,)
  341. if not configs:
  342. raise ValueError('Expected a non-empty sequence of connection '
  343. 'parameters, but got {!r}.'.format(configs))
  344. # Connection workflow completion args
  345. # `result` may be an instance of connection on success or exception on
  346. # failure.
  347. on_cw_done_result = _CallbackResult(
  348. namedtuple('BlockingConnection_OnConnectionWorkflowDoneArgs',
  349. 'result'))
  350. impl_class = impl_class or select_connection.SelectConnection
  351. ioloop = select_connection.IOLoop()
  352. ioloop.activate_poller()
  353. try:
  354. impl_class.create_connection(
  355. configs,
  356. on_done=on_cw_done_result.set_value_once,
  357. custom_ioloop=ioloop)
  358. while not on_cw_done_result.ready:
  359. ioloop.poll()
  360. ioloop.process_timeouts()
  361. if isinstance(on_cw_done_result.value.result, BaseException):
  362. error = on_cw_done_result.value.result
  363. LOGGER.error('Connection workflow failed: %r', error)
  364. raise self._reap_last_connection_workflow_error(error)
  365. else:
  366. LOGGER.info('Connection workflow succeeded: %r',
  367. on_cw_done_result.value.result)
  368. return on_cw_done_result.value.result
  369. except Exception:
  370. LOGGER.exception('Error in _create_connection().')
  371. ioloop.close()
  372. self._cleanup()
  373. raise
  374. @staticmethod
  375. def _reap_last_connection_workflow_error(error):
  376. """Extract exception value from the last connection attempt
  377. :param Exception error: error passed by the `AMQPConnectionWorkflow`
  378. completion callback.
  379. :returns: Exception value from the last connection attempt
  380. :rtype: Exception
  381. """
  382. if isinstance(error, connection_workflow.AMQPConnectionWorkflowFailed):
  383. # Extract exception value from the last connection attempt
  384. error = error.exceptions[-1]
  385. if isinstance(error,
  386. connection_workflow.AMQPConnectorSocketConnectError):
  387. error = exceptions.AMQPConnectionError(error)
  388. elif isinstance(error,
  389. connection_workflow.AMQPConnectorPhaseErrorBase):
  390. error = error.exception
  391. return error
  392. def _flush_output(self, *waiters):
  393. """ Flush output and process input while waiting for any of the given
  394. callbacks to return true. The wait is aborted upon connection-close.
  395. Otherwise, processing continues until the output is flushed AND at least
  396. one of the callbacks returns true. If there are no callbacks, then
  397. processing ends when all output is flushed.
  398. :param waiters: sequence of zero or more callables taking no args and
  399. returning true when it's time to stop processing.
  400. Their results are OR'ed together.
  401. :raises: exceptions passed by impl if opening of connection fails or
  402. connection closes.
  403. """
  404. if self.is_closed:
  405. raise exceptions.ConnectionWrongStateError()
  406. # Conditions for terminating the processing loop:
  407. # connection closed
  408. # OR
  409. # empty outbound buffer and no waiters
  410. # OR
  411. # empty outbound buffer and any waiter is ready
  412. is_done = (lambda:
  413. self._closed_result.ready or
  414. ((not self._impl._transport or
  415. self._impl._get_write_buffer_size() == 0) and
  416. (not waiters or any(ready() for ready in waiters))))
  417. # Process I/O until our completion condition is satisfied
  418. while not is_done():
  419. self._impl.ioloop.poll()
  420. self._impl.ioloop.process_timeouts()
  421. if self._closed_result.ready:
  422. try:
  423. if not isinstance(self._closed_result.value.error,
  424. exceptions.ConnectionClosedByClient):
  425. LOGGER.error('Unexpected connection close detected: %r',
  426. self._closed_result.value.error)
  427. raise self._closed_result.value.error
  428. else:
  429. LOGGER.info('User-initiated close: result=%r',
  430. self._closed_result.value)
  431. finally:
  432. self._cleanup()
  433. def _request_channel_dispatch(self, channel_number):
  434. """Called by BlockingChannel instances to request a call to their
  435. _dispatch_events method or to terminate `process_data_events`;
  436. BlockingConnection will honor these requests from a safe context.
  437. :param int channel_number: positive channel number to request a call
  438. to the channel's `_dispatch_events`; a negative channel number to
  439. request termination of `process_data_events`
  440. """
  441. self._channels_pending_dispatch.add(channel_number)
  442. def _dispatch_channel_events(self):
  443. """Invoke the `_dispatch_events` method on open channels that requested
  444. it
  445. """
  446. if not self._channels_pending_dispatch:
  447. return
  448. with self._acquire_event_dispatch() as dispatch_acquired:
  449. if not dispatch_acquired:
  450. # Nested dispatch or dispatch blocked higher in call stack
  451. return
  452. candidates = list(self._channels_pending_dispatch)
  453. self._channels_pending_dispatch.clear()
  454. for channel_number in candidates:
  455. if channel_number < 0:
  456. # This was meant to terminate process_data_events
  457. continue
  458. try:
  459. impl_channel = self._impl._channels[channel_number]
  460. except KeyError:
  461. continue
  462. if impl_channel.is_open:
  463. impl_channel._get_cookie()._dispatch_events()
  464. def _on_timer_ready(self, evt):
  465. """Handle expiry of a timer that was registered via
  466. `_adapter_call_later()`
  467. :param _TimerEvt evt:
  468. """
  469. self._ready_events.append(evt)
  470. def _on_threadsafe_callback(self, user_callback):
  471. """Handle callback that was registered via
  472. `self._impl._adapter_add_callback_threadsafe`.
  473. :param user_callback: callback passed to our
  474. `add_callback_threadsafe` by the application.
  475. """
  476. # Turn it into a 0-delay timeout to take advantage of our existing logic
  477. # that deals with reentrancy
  478. self.call_later(0, user_callback)
  479. def _on_connection_blocked(self, user_callback, _impl, method_frame):
  480. """Handle Connection.Blocked notification from RabbitMQ broker
  481. :param callable user_callback: callback passed to
  482. `add_on_connection_blocked_callback`
  483. :param select_connection.SelectConnection _impl:
  484. :param pika.frame.Method method_frame: method frame having `method`
  485. member of type `pika.spec.Connection.Blocked`
  486. """
  487. self._ready_events.append(
  488. _ConnectionBlockedEvt(user_callback, method_frame))
  489. def _on_connection_unblocked(self, user_callback, _impl, method_frame):
  490. """Handle Connection.Unblocked notification from RabbitMQ broker
  491. :param callable user_callback: callback passed to
  492. `add_on_connection_unblocked_callback`
  493. :param select_connection.SelectConnection _impl:
  494. :param pika.frame.Method method_frame: method frame having `method`
  495. member of type `pika.spec.Connection.Blocked`
  496. """
  497. self._ready_events.append(
  498. _ConnectionUnblockedEvt(user_callback, method_frame))
  499. def _dispatch_connection_events(self):
  500. """Dispatch ready connection events"""
  501. if not self._ready_events:
  502. return
  503. with self._acquire_event_dispatch() as dispatch_acquired:
  504. if not dispatch_acquired:
  505. # Nested dispatch or dispatch blocked higher in call stack
  506. return
  507. # Limit dispatch to the number of currently ready events to avoid
  508. # getting stuck in this loop
  509. for _ in compat.xrange(len(self._ready_events)):
  510. try:
  511. evt = self._ready_events.popleft()
  512. except IndexError:
  513. # Some events (e.g., timers) must have been cancelled
  514. break
  515. evt.dispatch()
  516. def add_on_connection_blocked_callback(self, callback):
  517. """RabbitMQ AMQP extension - Add a callback to be notified when the
  518. connection gets blocked (`Connection.Blocked` received from RabbitMQ)
  519. due to the broker running low on resources (memory or disk). In this
  520. state RabbitMQ suspends processing incoming data until the connection
  521. is unblocked, so it's a good idea for publishers receiving this
  522. notification to suspend publishing until the connection becomes
  523. unblocked.
  524. NOTE: due to the blocking nature of BlockingConnection, if it's sending
  525. outbound data while the connection is/becomes blocked, the call may
  526. remain blocked until the connection becomes unblocked, if ever. You
  527. may use `ConnectionParameters.blocked_connection_timeout` to abort a
  528. BlockingConnection method call with an exception when the connection
  529. remains blocked longer than the given timeout value.
  530. See also `Connection.add_on_connection_unblocked_callback()`
  531. See also `ConnectionParameters.blocked_connection_timeout`.
  532. :param callable callback: Callback to call on `Connection.Blocked`,
  533. having the signature `callback(connection, pika.frame.Method)`,
  534. where connection is the `BlockingConnection` instance and the method
  535. frame's `method` member is of type `pika.spec.Connection.Blocked`
  536. """
  537. self._impl.add_on_connection_blocked_callback(
  538. functools.partial(self._on_connection_blocked,
  539. functools.partial(callback, self)))
  540. def add_on_connection_unblocked_callback(self, callback):
  541. """RabbitMQ AMQP extension - Add a callback to be notified when the
  542. connection gets unblocked (`Connection.Unblocked` frame is received from
  543. RabbitMQ) letting publishers know it's ok to start publishing again.
  544. :param callable callback: Callback to call on Connection.Unblocked`,
  545. having the signature `callback(connection, pika.frame.Method)`,
  546. where connection is the `BlockingConnection` instance and the method
  547. frame's `method` member is of type `pika.spec.Connection.Unblocked`
  548. """
  549. self._impl.add_on_connection_unblocked_callback(
  550. functools.partial(self._on_connection_unblocked,
  551. functools.partial(callback, self)))
  552. def call_later(self, delay, callback):
  553. """Create a single-shot timer to fire after delay seconds. Do not
  554. confuse with Tornado's timeout where you pass in the time you want to
  555. have your callback called. Only pass in the seconds until it's to be
  556. called.
  557. NOTE: the timer callbacks are dispatched only in the scope of
  558. specially-designated methods: see
  559. `BlockingConnection.process_data_events()` and
  560. `BlockingChannel.start_consuming()`.
  561. :param float delay: The number of seconds to wait to call callback
  562. :param callable callback: The callback method with the signature
  563. callback()
  564. :returns: Opaque timer id
  565. :rtype: int
  566. """
  567. validators.require_callback(callback)
  568. evt = _TimerEvt(callback=callback)
  569. timer_id = self._impl._adapter_call_later(
  570. delay, functools.partial(self._on_timer_ready, evt))
  571. evt.timer_id = timer_id
  572. return timer_id
  573. def add_callback_threadsafe(self, callback):
  574. """Requests a call to the given function as soon as possible in the
  575. context of this connection's thread.
  576. NOTE: This is the only thread-safe method in `BlockingConnection`. All
  577. other manipulations of `BlockingConnection` must be performed from the
  578. connection's thread.
  579. NOTE: the callbacks are dispatched only in the scope of
  580. specially-designated methods: see
  581. `BlockingConnection.process_data_events()` and
  582. `BlockingChannel.start_consuming()`.
  583. For example, a thread may request a call to the
  584. `BlockingChannel.basic_ack` method of a `BlockingConnection` that is
  585. running in a different thread via
  586. ```
  587. connection.add_callback_threadsafe(
  588. functools.partial(channel.basic_ack, delivery_tag=...))
  589. ```
  590. NOTE: if you know that the requester is running on the same thread as
  591. the connection it is more efficient to use the
  592. `BlockingConnection.call_later()` method with a delay of 0.
  593. :param callable callback: The callback method; must be callable
  594. :raises pika.exceptions.ConnectionWrongStateError: if connection is
  595. closed
  596. """
  597. with self._cleanup_mutex:
  598. # NOTE: keep in mind that we may be called from another thread and
  599. # this mutex only synchronizes us with our connection cleanup logic,
  600. # so a simple check for "is_closed" is pretty much all we're allowed
  601. # to do here besides calling the only thread-safe method
  602. # _adapter_add_callback_threadsafe().
  603. if self.is_closed:
  604. raise exceptions.ConnectionWrongStateError(
  605. 'BlockingConnection.add_callback_threadsafe() called on '
  606. 'closed or closing connection.')
  607. self._impl._adapter_add_callback_threadsafe(
  608. functools.partial(self._on_threadsafe_callback, callback))
  609. def remove_timeout(self, timeout_id):
  610. """Remove a timer if it's still in the timeout stack
  611. :param timeout_id: The opaque timer id to remove
  612. """
  613. # Remove from the impl's timeout stack
  614. self._impl._adapter_remove_timeout(timeout_id)
  615. # Remove from ready events, if the timer fired already
  616. for i, evt in enumerate(self._ready_events):
  617. if isinstance(evt, _TimerEvt) and evt.timer_id == timeout_id:
  618. index_to_remove = i
  619. break
  620. else:
  621. # Not found
  622. return
  623. del self._ready_events[index_to_remove]
  624. def close(self, reply_code=200, reply_text='Normal shutdown'):
  625. """Disconnect from RabbitMQ. If there are any open channels, it will
  626. attempt to close them prior to fully disconnecting. Channels which
  627. have active consumers will attempt to send a Basic.Cancel to RabbitMQ
  628. to cleanly stop the delivery of messages prior to closing the channel.
  629. :param int reply_code: The code number for the close
  630. :param str reply_text: The text reason for the close
  631. :raises pika.exceptions.ConnectionWrongStateError: if called on a closed
  632. connection (NEW in v1.0.0)
  633. """
  634. if not self.is_open:
  635. msg = '{}.close({}, {!r}) called on closed connection.'.format(
  636. self.__class__.__name__, reply_code, reply_text)
  637. LOGGER.error(msg)
  638. raise exceptions.ConnectionWrongStateError(msg)
  639. LOGGER.info('Closing connection (%s): %s', reply_code, reply_text)
  640. # Close channels that remain opened
  641. for impl_channel in compat.dictvalues(self._impl._channels):
  642. channel = impl_channel._get_cookie()
  643. if channel.is_open:
  644. try:
  645. channel.close(reply_code, reply_text)
  646. except exceptions.ChannelClosed as exc:
  647. # Log and suppress broker-closed channel
  648. LOGGER.warning(
  649. 'Got ChannelClosed while closing channel '
  650. 'from connection.close: %r', exc)
  651. # Close the connection
  652. self._impl.close(reply_code, reply_text)
  653. self._flush_output(self._closed_result.is_ready)
  654. def process_data_events(self, time_limit=0):
  655. """Will make sure that data events are processed. Dispatches timer and
  656. channel callbacks if not called from the scope of BlockingConnection or
  657. BlockingChannel callback. Your app can block on this method.
  658. :param float time_limit: suggested upper bound on processing time in
  659. seconds. The actual blocking time depends on the granularity of the
  660. underlying ioloop. Zero means return as soon as possible. None means
  661. there is no limit on processing time and the function will block
  662. until I/O produces actionable events. Defaults to 0 for backward
  663. compatibility. This parameter is NEW in pika 0.10.0.
  664. """
  665. with self._acquire_event_dispatch() as dispatch_acquired:
  666. # Check if we can actually process pending events
  667. common_terminator = lambda: bool(dispatch_acquired and
  668. (self._channels_pending_dispatch or
  669. self._ready_events))
  670. if time_limit is None:
  671. self._flush_output(common_terminator)
  672. else:
  673. with _IoloopTimerContext(time_limit, self._impl) as timer:
  674. self._flush_output(timer.is_ready, common_terminator)
  675. if self._ready_events:
  676. self._dispatch_connection_events()
  677. if self._channels_pending_dispatch:
  678. self._dispatch_channel_events()
  679. def sleep(self, duration):
  680. """A safer way to sleep than calling time.sleep() directly that would
  681. keep the adapter from ignoring frames sent from the broker. The
  682. connection will "sleep" or block the number of seconds specified in
  683. duration in small intervals.
  684. :param float duration: The time to sleep in seconds
  685. """
  686. assert duration >= 0, duration
  687. deadline = compat.time_now() + duration
  688. time_limit = duration
  689. # Process events at least once
  690. while True:
  691. self.process_data_events(time_limit)
  692. time_limit = deadline - compat.time_now()
  693. if time_limit <= 0:
  694. break
  695. def channel(self, channel_number=None):
  696. """Create a new channel with the next available channel number or pass
  697. in a channel number to use. Must be non-zero if you would like to
  698. specify but it is recommended that you let Pika manage the channel
  699. numbers.
  700. :rtype: pika.adapters.blocking_connection.BlockingChannel
  701. """
  702. with _CallbackResult(self._OnChannelOpenedArgs) as opened_args:
  703. impl_channel = self._impl.channel(
  704. channel_number=channel_number,
  705. on_open_callback=opened_args.set_value_once)
  706. # Create our proxy channel
  707. channel = BlockingChannel(impl_channel, self)
  708. # Link implementation channel with our proxy channel
  709. impl_channel._set_cookie(channel)
  710. # Drive I/O until Channel.Open-ok
  711. channel._flush_output(opened_args.is_ready)
  712. return channel
  713. #
  714. # Connections state properties
  715. #
  716. @property
  717. def is_closed(self):
  718. """
  719. Returns a boolean reporting the current connection state.
  720. """
  721. return self._impl.is_closed
  722. @property
  723. def is_open(self):
  724. """
  725. Returns a boolean reporting the current connection state.
  726. """
  727. return self._impl.is_open
  728. #
  729. # Properties that reflect server capabilities for the current connection
  730. #
  731. @property
  732. def basic_nack_supported(self):
  733. """Specifies if the server supports basic.nack on the active connection.
  734. :rtype: bool
  735. """
  736. return self._impl.basic_nack
  737. @property
  738. def consumer_cancel_notify_supported(self):
  739. """Specifies if the server supports consumer cancel notification on the
  740. active connection.
  741. :rtype: bool
  742. """
  743. return self._impl.consumer_cancel_notify
  744. @property
  745. def exchange_exchange_bindings_supported(self):
  746. """Specifies if the active connection supports exchange to exchange
  747. bindings.
  748. :rtype: bool
  749. """
  750. return self._impl.exchange_exchange_bindings
  751. @property
  752. def publisher_confirms_supported(self):
  753. """Specifies if the active connection can use publisher confirmations.
  754. :rtype: bool
  755. """
  756. return self._impl.publisher_confirms
  757. # Legacy property names for backward compatibility
  758. basic_nack = basic_nack_supported
  759. consumer_cancel_notify = consumer_cancel_notify_supported
  760. exchange_exchange_bindings = exchange_exchange_bindings_supported
  761. publisher_confirms = publisher_confirms_supported
  762. class _ChannelPendingEvt(object):
  763. """Base class for BlockingChannel pending events"""
  764. class _ConsumerDeliveryEvt(_ChannelPendingEvt):
  765. """This event represents consumer message delivery `Basic.Deliver`; it
  766. contains method, properties, and body of the delivered message.
  767. """
  768. __slots__ = ('method', 'properties', 'body')
  769. def __init__(self, method, properties, body):
  770. """
  771. :param spec.Basic.Deliver method: NOTE: consumer_tag and delivery_tag
  772. are valid only within source channel
  773. :param spec.BasicProperties properties: message properties
  774. :param bytes body: message body; empty string if no body
  775. """
  776. self.method = method
  777. self.properties = properties
  778. self.body = body
  779. class _ConsumerCancellationEvt(_ChannelPendingEvt):
  780. """This event represents server-initiated consumer cancellation delivered to
  781. client via Basic.Cancel. After receiving Basic.Cancel, there will be no
  782. further deliveries for the consumer identified by `consumer_tag` in
  783. `Basic.Cancel`
  784. """
  785. __slots__ = ('method_frame',)
  786. def __init__(self, method_frame):
  787. """
  788. :param pika.frame.Method method_frame: method frame with method of type
  789. `spec.Basic.Cancel`
  790. """
  791. self.method_frame = method_frame
  792. def __repr__(self):
  793. return '<%s method_frame=%r>' % (self.__class__.__name__,
  794. self.method_frame)
  795. @property
  796. def method(self):
  797. """method of type spec.Basic.Cancel"""
  798. return self.method_frame.method
  799. class _ReturnedMessageEvt(_ChannelPendingEvt):
  800. """This event represents a message returned by broker via `Basic.Return`"""
  801. __slots__ = ('callback', 'channel', 'method', 'properties', 'body')
  802. def __init__(self, callback, channel, method, properties, body):
  803. """
  804. :param callable callback: user's callback, having the signature
  805. callback(channel, method, properties, body), where
  806. channel: pika.Channel
  807. method: pika.spec.Basic.Return
  808. properties: pika.spec.BasicProperties
  809. body: bytes
  810. :param pika.Channel channel:
  811. :param pika.spec.Basic.Return method:
  812. :param pika.spec.BasicProperties properties:
  813. :param bytes body:
  814. """
  815. self.callback = callback
  816. self.channel = channel
  817. self.method = method
  818. self.properties = properties
  819. self.body = body
  820. def __repr__(self):
  821. return ('<%s callback=%r channel=%r method=%r properties=%r '
  822. 'body=%.300r>') % (self.__class__.__name__, self.callback,
  823. self.channel, self.method, self.properties,
  824. self.body)
  825. def dispatch(self):
  826. """Dispatch user's callback"""
  827. self.callback(self.channel, self.method, self.properties, self.body)
  828. class ReturnedMessage(object):
  829. """Represents a message returned via Basic.Return in publish-acknowledgments
  830. mode
  831. """
  832. __slots__ = ('method', 'properties', 'body')
  833. def __init__(self, method, properties, body):
  834. """
  835. :param spec.Basic.Return method:
  836. :param spec.BasicProperties properties: message properties
  837. :param bytes body: message body; empty string if no body
  838. """
  839. self.method = method
  840. self.properties = properties
  841. self.body = body
  842. class _ConsumerInfo(object):
  843. """Information about an active consumer"""
  844. __slots__ = ('consumer_tag', 'auto_ack', 'on_message_callback',
  845. 'alternate_event_sink', 'state')
  846. # Consumer states
  847. SETTING_UP = 1
  848. ACTIVE = 2
  849. TEARING_DOWN = 3
  850. CANCELLED_BY_BROKER = 4
  851. def __init__(self,
  852. consumer_tag,
  853. auto_ack,
  854. on_message_callback=None,
  855. alternate_event_sink=None):
  856. """
  857. NOTE: exactly one of callback/alternate_event_sink musts be non-None.
  858. :param str consumer_tag:
  859. :param bool auto_ack: the no-ack value for the consumer
  860. :param callable on_message_callback: The function for dispatching messages to
  861. user, having the signature:
  862. on_message_callback(channel, method, properties, body)
  863. channel: BlockingChannel
  864. method: spec.Basic.Deliver
  865. properties: spec.BasicProperties
  866. body: bytes
  867. :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt
  868. and _ConsumerCancellationEvt objects will be diverted to this
  869. callback instead of being deposited in the channel's
  870. `_pending_events` container. Signature:
  871. alternate_event_sink(evt)
  872. """
  873. assert (on_message_callback is None) != (
  874. alternate_event_sink is None
  875. ), ('exactly one of on_message_callback/alternate_event_sink must be non-None',
  876. on_message_callback, alternate_event_sink)
  877. self.consumer_tag = consumer_tag
  878. self.auto_ack = auto_ack
  879. self.on_message_callback = on_message_callback
  880. self.alternate_event_sink = alternate_event_sink
  881. self.state = self.SETTING_UP
  882. @property
  883. def setting_up(self):
  884. """True if in SETTING_UP state"""
  885. return self.state == self.SETTING_UP
  886. @property
  887. def active(self):
  888. """True if in ACTIVE state"""
  889. return self.state == self.ACTIVE
  890. @property
  891. def tearing_down(self):
  892. """True if in TEARING_DOWN state"""
  893. return self.state == self.TEARING_DOWN
  894. @property
  895. def cancelled_by_broker(self):
  896. """True if in CANCELLED_BY_BROKER state"""
  897. return self.state == self.CANCELLED_BY_BROKER
  898. class _QueueConsumerGeneratorInfo(object):
  899. """Container for information about the active queue consumer generator """
  900. __slots__ = ('params', 'consumer_tag', 'pending_events')
  901. def __init__(self, params, consumer_tag):
  902. """
  903. :params tuple params: a three-tuple (queue, auto_ack, exclusive) that were
  904. used to create the queue consumer
  905. :param str consumer_tag: consumer tag
  906. """
  907. self.params = params
  908. self.consumer_tag = consumer_tag
  909. #self.messages = deque()
  910. # Holds pending events of types _ConsumerDeliveryEvt and
  911. # _ConsumerCancellationEvt
  912. self.pending_events = deque()
  913. def __repr__(self):
  914. return '<%s params=%r consumer_tag=%r>' % (
  915. self.__class__.__name__, self.params, self.consumer_tag)
  916. class BlockingChannel(object):
  917. """The BlockingChannel implements blocking semantics for most things that
  918. one would use callback-passing-style for with the
  919. :py:class:`~pika.channel.Channel` class. In addition,
  920. the `BlockingChannel` class implements a :term:`generator` that allows
  921. you to :doc:`consume messages </examples/blocking_consumer_generator>`
  922. without using callbacks.
  923. Example of creating a BlockingChannel::
  924. import pika
  925. # Create our connection object
  926. connection = pika.BlockingConnection()
  927. # The returned object will be a synchronous channel
  928. channel = connection.channel()
  929. """
  930. # Used as value_class with _CallbackResult for receiving Basic.GetOk args
  931. _RxMessageArgs = namedtuple(
  932. 'BlockingChannel__RxMessageArgs',
  933. [
  934. 'channel', # implementation pika.Channel instance
  935. 'method', # Basic.GetOk
  936. 'properties', # pika.spec.BasicProperties
  937. 'body' # str, unicode, or bytes (python 3.x)
  938. ])
  939. # For use as value_class with any _CallbackResult that expects method_frame
  940. # as the only arg
  941. _MethodFrameCallbackResultArgs = namedtuple(
  942. 'BlockingChannel__MethodFrameCallbackResultArgs', 'method_frame')
  943. # Broker's basic-ack/basic-nack args when delivery confirmation is enabled;
  944. # may concern a single or multiple messages
  945. _OnMessageConfirmationReportArgs = namedtuple(
  946. 'BlockingChannel__OnMessageConfirmationReportArgs', 'method_frame')
  947. # For use as value_class with _CallbackResult expecting Channel.Flow
  948. # confirmation.
  949. _FlowOkCallbackResultArgs = namedtuple(
  950. 'BlockingChannel__FlowOkCallbackResultArgs',
  951. 'active' # True if broker will start or continue sending; False if not
  952. )
  953. _CONSUMER_CANCELLED_CB_KEY = 'blocking_channel_consumer_cancelled'
  954. def __init__(self, channel_impl, connection):
  955. """Create a new instance of the Channel
  956. :param pika.channel.Channel channel_impl: Channel implementation object
  957. as returned from SelectConnection.channel()
  958. :param BlockingConnection connection: The connection object
  959. """
  960. self._impl = channel_impl
  961. self._connection = connection
  962. # A mapping of consumer tags to _ConsumerInfo for active consumers
  963. self._consumer_infos = dict()
  964. # Queue consumer generator generator info of type
  965. # _QueueConsumerGeneratorInfo created by BlockingChannel.consume
  966. self._queue_consumer_generator = None
  967. # Whether RabbitMQ delivery confirmation has been enabled
  968. self._delivery_confirmation = False
  969. # Receives message delivery confirmation report (Basic.ack or
  970. # Basic.nack) from broker when delivery confirmations are enabled
  971. self._message_confirmation_result = _CallbackResult(
  972. self._OnMessageConfirmationReportArgs)
  973. # deque of pending events: _ConsumerDeliveryEvt and
  974. # _ConsumerCancellationEvt objects that will be returned by
  975. # `BlockingChannel.get_event()`
  976. self._pending_events = deque()
  977. # Holds a ReturnedMessage object representing a message received via
  978. # Basic.Return in publisher-acknowledgments mode.
  979. self._puback_return = None
  980. # self._on_channel_closed() saves the reason exception here
  981. self._closing_reason = None # type: None | Exception
  982. # Receives Basic.ConsumeOk reply from server
  983. self._basic_consume_ok_result = _CallbackResult()
  984. # Receives args from Basic.GetEmpty response
  985. # http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
  986. self._basic_getempty_result = _CallbackResult(
  987. self._MethodFrameCallbackResultArgs)
  988. self._impl.add_on_cancel_callback(self._on_consumer_cancelled_by_broker)
  989. self._impl.add_callback(
  990. self._basic_consume_ok_result.signal_once,
  991. replies=[pika.spec.Basic.ConsumeOk],
  992. one_shot=False)
  993. self._impl.add_on_close_callback(self._on_channel_closed)
  994. self._impl.add_callback(
  995. self._basic_getempty_result.set_value_once,
  996. replies=[pika.spec.Basic.GetEmpty],
  997. one_shot=False)
  998. LOGGER.info("Created channel=%s", self.channel_number)
  999. def __int__(self):
  1000. """Return the channel object as its channel number
  1001. NOTE: inherited from legacy BlockingConnection; might be error-prone;
  1002. use `channel_number` property instead.
  1003. :rtype: int
  1004. """
  1005. return self.channel_number
  1006. def __repr__(self):
  1007. return '<%s impl=%r>' % (self.__class__.__name__, self._impl)
  1008. def __enter__(self):
  1009. return self
  1010. def __exit__(self, exc_type, value, traceback):
  1011. if self.is_open:
  1012. self.close()
  1013. def _cleanup(self):
  1014. """Clean up members that might inhibit garbage collection"""
  1015. self._message_confirmation_result.reset()
  1016. self._pending_events = deque()
  1017. self._consumer_infos = dict()
  1018. self._queue_consumer_generator = None
  1019. @property
  1020. def channel_number(self):
  1021. """Channel number"""
  1022. return self._impl.channel_number
  1023. @property
  1024. def connection(self):
  1025. """The channel's BlockingConnection instance"""
  1026. return self._connection
  1027. @property
  1028. def is_closed(self):
  1029. """Returns True if the channel is closed.
  1030. :rtype: bool
  1031. """
  1032. return self._impl.is_closed
  1033. @property
  1034. def is_open(self):
  1035. """Returns True if the channel is open.
  1036. :rtype: bool
  1037. """
  1038. return self._impl.is_open
  1039. _ALWAYS_READY_WAITERS = ((lambda: True),)
  1040. def _flush_output(self, *waiters):
  1041. """ Flush output and process input while waiting for any of the given
  1042. callbacks to return true. The wait is aborted upon channel-close or
  1043. connection-close.
  1044. Otherwise, processing continues until the output is flushed AND at least
  1045. one of the callbacks returns true. If there are no callbacks, then
  1046. processing ends when all output is flushed.
  1047. :param waiters: sequence of zero or more callables taking no args and
  1048. returning true when it's time to stop processing.
  1049. Their results are OR'ed together. An empty sequence is
  1050. treated as equivalent to a waiter always returning true.
  1051. """
  1052. if self.is_closed:
  1053. self._impl._raise_if_not_open()
  1054. if not waiters:
  1055. waiters = self._ALWAYS_READY_WAITERS
  1056. self._connection._flush_output(lambda: self.is_closed, *waiters)
  1057. if self.is_closed and isinstance(self._closing_reason,
  1058. exceptions.ChannelClosedByBroker):
  1059. raise self._closing_reason # pylint: disable=E0702
  1060. def _on_puback_message_returned(self, channel, method, properties, body):
  1061. """Called as the result of Basic.Return from broker in
  1062. publisher-acknowledgements mode. Saves the info as a ReturnedMessage
  1063. instance in self._puback_return.
  1064. :param pika.Channel channel: our self._impl channel
  1065. :param pika.spec.Basic.Return method:
  1066. :param pika.spec.BasicProperties properties: message properties
  1067. :param bytes body: returned message body; empty string if no body
  1068. """
  1069. assert channel is self._impl, (channel.channel_number,
  1070. self.channel_number)
  1071. assert isinstance(method, pika.spec.Basic.Return), method
  1072. assert isinstance(properties, pika.spec.BasicProperties), (properties)
  1073. LOGGER.warning(
  1074. "Published message was returned: _delivery_confirmation=%s; "
  1075. "channel=%s; method=%r; properties=%r; body_size=%d; "
  1076. "body_prefix=%.255r", self._delivery_confirmation,
  1077. channel.channel_number, method, properties,
  1078. len(body) if body is not None else None, body)
  1079. self._puback_return = ReturnedMessage(method, properties, body)
  1080. def _add_pending_event(self, evt):
  1081. """Append an event to the channel's list of events that are ready for
  1082. dispatch to user and signal our connection that this channel is ready
  1083. for event dispatch
  1084. :param _ChannelPendingEvt evt: an event derived from _ChannelPendingEvt
  1085. """
  1086. self._pending_events.append(evt)
  1087. self.connection._request_channel_dispatch(self.channel_number)
  1088. def _on_channel_closed(self, _channel, reason):
  1089. """Callback from impl notifying us that the channel has been closed.
  1090. This may be as the result of user-, broker-, or internal connection
  1091. clean-up initiated closing or meta-closing of the channel.
  1092. If it resulted from receiving `Channel.Close` from broker, we will
  1093. expedite waking up of the event subsystem so that it may respond by
  1094. raising `ChannelClosed` from user's context.
  1095. NOTE: We can't raise exceptions in callbacks in order to protect
  1096. the integrity of the underlying implementation. BlockingConnection's
  1097. underlying asynchronous connection adapter (SelectConnection) uses
  1098. callbacks to communicate with us. If BlockingConnection leaks exceptions
  1099. back into the I/O loop or the asynchronous connection adapter, we
  1100. interrupt their normal workflow and introduce a high likelihood of state
  1101. inconsistency.
  1102. See `pika.Channel.add_on_close_callback()` for additional documentation.
  1103. :param pika.Channel _channel: (unused)
  1104. :param Exception reason:
  1105. """
  1106. LOGGER.debug('_on_channel_closed: %r; %r', reason, self)
  1107. self._closing_reason = reason
  1108. if isinstance(reason, exceptions.ChannelClosedByBroker):
  1109. self._cleanup()
  1110. # Request urgent termination of `process_data_events()`, in case
  1111. # it's executing or next time it will execute
  1112. self.connection._request_channel_dispatch(-self.channel_number)
  1113. def _on_consumer_cancelled_by_broker(self, method_frame):
  1114. """Called by impl when broker cancels consumer via Basic.Cancel.
  1115. This is a RabbitMQ-specific feature. The circumstances include deletion
  1116. of queue being consumed as well as failure of a HA node responsible for
  1117. the queue being consumed.
  1118. :param pika.frame.Method method_frame: method frame with the
  1119. `spec.Basic.Cancel` method
  1120. """
  1121. evt = _ConsumerCancellationEvt(method_frame)
  1122. consumer = self._consumer_infos[method_frame.method.consumer_tag]
  1123. # Don't interfere with client-initiated cancellation flow
  1124. if not consumer.tearing_down:
  1125. consumer.state = _ConsumerInfo.CANCELLED_BY_BROKER
  1126. if consumer.alternate_event_sink is not None:
  1127. consumer.alternate_event_sink(evt)
  1128. else:
  1129. self._add_pending_event(evt)
  1130. def _on_consumer_message_delivery(self, _channel, method, properties, body):
  1131. """Called by impl when a message is delivered for a consumer
  1132. :param Channel channel: The implementation channel object
  1133. :param spec.Basic.Deliver method:
  1134. :param pika.spec.BasicProperties properties: message properties
  1135. :param bytes body: delivered message body; empty string if no body
  1136. """
  1137. evt = _ConsumerDeliveryEvt(method, properties, body)
  1138. consumer = self._consumer_infos[method.consumer_tag]
  1139. if consumer.alternate_event_sink is not None:
  1140. consumer.alternate_event_sink(evt)
  1141. else:
  1142. self._add_pending_event(evt)
  1143. def _on_consumer_generator_event(self, evt):
  1144. """Sink for the queue consumer generator's consumer events; append the
  1145. event to queue consumer generator's pending events buffer.
  1146. :param evt: an object of type _ConsumerDeliveryEvt or
  1147. _ConsumerCancellationEvt
  1148. """
  1149. self._queue_consumer_generator.pending_events.append(evt)
  1150. # Schedule termination of connection.process_data_events using a
  1151. # negative channel number
  1152. self.connection._request_channel_dispatch(-self.channel_number)
  1153. def _cancel_all_consumers(self):
  1154. """Cancel all consumers.
  1155. NOTE: pending non-ackable messages will be lost; pending ackable
  1156. messages will be rejected.
  1157. """
  1158. if self._consumer_infos:
  1159. LOGGER.debug('Cancelling %i consumers', len(self._consumer_infos))
  1160. if self._queue_consumer_generator is not None:
  1161. # Cancel queue consumer generator
  1162. self.cancel()
  1163. # Cancel consumers created via basic_consume
  1164. for consumer_tag in compat.dictkeys(self._consumer_infos):
  1165. self.basic_cancel(consumer_tag)
  1166. def _dispatch_events(self):
  1167. """Called by BlockingConnection to dispatch pending events.
  1168. `BlockingChannel` schedules this callback via
  1169. `BlockingConnection._request_channel_dispatch`
  1170. """
  1171. while self._pending_events:
  1172. evt = self._pending_events.popleft()
  1173. if type(evt) is _ConsumerDeliveryEvt: # pylint: disable=C0123
  1174. consumer_info = self._consumer_infos[evt.method.consumer_tag]
  1175. consumer_info.on_message_callback(self, evt.method,
  1176. evt.properties, evt.body)
  1177. elif type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123
  1178. del self._consumer_infos[evt.method_frame.method.consumer_tag]
  1179. self._impl.callbacks.process(self.channel_number,
  1180. self._CONSUMER_CANCELLED_CB_KEY,
  1181. self, evt.method_frame)
  1182. else:
  1183. evt.dispatch()
  1184. def close(self, reply_code=0, reply_text="Normal shutdown"):
  1185. """Will invoke a clean shutdown of the channel with the AMQP Broker.
  1186. :param int reply_code: The reply code to close the channel with
  1187. :param str reply_text: The reply text to close the channel with
  1188. """
  1189. LOGGER.debug('Channel.close(%s, %s)', reply_code, reply_text)
  1190. self._impl._raise_if_not_open()
  1191. try:
  1192. # Cancel remaining consumers
  1193. self._cancel_all_consumers()
  1194. # Close the channel
  1195. self._impl.close(reply_code=reply_code, reply_text=reply_text)
  1196. self._flush_output(lambda: self.is_closed)
  1197. finally:
  1198. self._cleanup()
  1199. def flow(self, active):
  1200. """Turn Channel flow control off and on.
  1201. NOTE: RabbitMQ doesn't support active=False; per
  1202. https://www.rabbitmq.com/specification.html: "active=false is not
  1203. supported by the server. Limiting prefetch with basic.qos provides much
  1204. better control"
  1205. For more information, please reference:
  1206. http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
  1207. :param bool active: Turn flow on (True) or off (False)
  1208. :returns: True if broker will start or continue sending; False if not
  1209. :rtype: bool
  1210. """
  1211. with _CallbackResult(self._FlowOkCallbackResultArgs) as flow_ok_result:
  1212. self._impl.flow(
  1213. active=active, callback=flow_ok_result.set_value_once)
  1214. self._flush_output(flow_ok_result.is_ready)
  1215. return flow_ok_result.value.active
  1216. def add_on_cancel_callback(self, callback):
  1217. """Pass a callback function that will be called when Basic.Cancel
  1218. is sent by the broker. The callback function should receive a method
  1219. frame parameter.
  1220. :param callable callback: a callable for handling broker's Basic.Cancel
  1221. notification with the call signature: callback(method_frame)
  1222. where method_frame is of type `pika.frame.Method` with method of
  1223. type `spec.Basic.Cancel`
  1224. """
  1225. self._impl.callbacks.add(
  1226. self.channel_number,
  1227. self._CONSUMER_CANCELLED_CB_KEY,
  1228. callback,
  1229. one_shot=False)
  1230. def add_on_return_callback(self, callback):
  1231. """Pass a callback function that will be called when a published
  1232. message is rejected and returned by the server via `Basic.Return`.
  1233. :param callable callback: The method to call on callback with the
  1234. signature callback(channel, method, properties, body), where
  1235. channel: pika.Channel
  1236. method: pika.spec.Basic.Return
  1237. properties: pika.spec.BasicProperties
  1238. body: bytes
  1239. """
  1240. self._impl.add_on_return_callback(
  1241. lambda _channel, method, properties, body: (
  1242. self._add_pending_event(
  1243. _ReturnedMessageEvt(
  1244. callback, self, method, properties, body))))
  1245. def basic_consume(self,
  1246. queue,
  1247. on_message_callback,
  1248. auto_ack=False,
  1249. exclusive=False,
  1250. consumer_tag=None,
  1251. arguments=None):
  1252. """Sends the AMQP command Basic.Consume to the broker and binds messages
  1253. for the consumer_tag to the consumer callback. If you do not pass in
  1254. a consumer_tag, one will be automatically generated for you. Returns
  1255. the consumer tag.
  1256. NOTE: the consumer callbacks are dispatched only in the scope of
  1257. specially-designated methods: see
  1258. `BlockingConnection.process_data_events` and
  1259. `BlockingChannel.start_consuming`.
  1260. For more information about Basic.Consume, see:
  1261. http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
  1262. :param str queue: The queue from which to consume
  1263. :param callable on_message_callback: Required function for dispatching messages
  1264. to user, having the signature:
  1265. on_message_callback(channel, method, properties, body)
  1266. channel: BlockingChannel
  1267. method: spec.Basic.Deliver
  1268. properties: spec.BasicProperties
  1269. body: bytes
  1270. :param bool auto_ack: if set to True, automatic acknowledgement mode will be used
  1271. (see http://www.rabbitmq.com/confirms.html). This corresponds
  1272. with the 'no_ack' parameter in the basic.consume AMQP 0.9.1
  1273. method
  1274. :param bool exclusive: Don't allow other consumers on the queue
  1275. :param str consumer_tag: You may specify your own consumer tag; if left
  1276. empty, a consumer tag will be generated automatically
  1277. :param dict arguments: Custom key/value pair arguments for the consumer
  1278. :returns: consumer tag
  1279. :rtype: str
  1280. :raises pika.exceptions.DuplicateConsumerTag: if consumer with given
  1281. consumer_tag is already present.
  1282. """
  1283. validators.require_string(queue, 'queue')
  1284. validators.require_callback(on_message_callback, 'on_message_callback')
  1285. return self._basic_consume_impl(
  1286. queue=queue,
  1287. on_message_callback=on_message_callback,
  1288. auto_ack=auto_ack,
  1289. exclusive=exclusive,
  1290. consumer_tag=consumer_tag,
  1291. arguments=arguments)
  1292. def _basic_consume_impl(self,
  1293. queue,
  1294. auto_ack,
  1295. exclusive,
  1296. consumer_tag,
  1297. arguments=None,
  1298. on_message_callback=None,
  1299. alternate_event_sink=None):
  1300. """The low-level implementation used by `basic_consume` and `consume`.
  1301. See `basic_consume` docstring for more info.
  1302. NOTE: exactly one of on_message_callback/alternate_event_sink musts be
  1303. non-None.
  1304. This method has one additional parameter alternate_event_sink over the
  1305. args described in `basic_consume`.
  1306. :param callable alternate_event_sink: if specified, _ConsumerDeliveryEvt
  1307. and _ConsumerCancellationEvt objects will be diverted to this
  1308. callback instead of being deposited in the channel's
  1309. `_pending_events` container. Signature:
  1310. alternate_event_sink(evt)
  1311. :raises pika.exceptions.DuplicateConsumerTag: if consumer with given
  1312. consumer_tag is already present.
  1313. """
  1314. if (on_message_callback is None) == (alternate_event_sink is None):
  1315. raise ValueError(
  1316. ('exactly one of on_message_callback/alternate_event_sink must '
  1317. 'be non-None', on_message_callback, alternate_event_sink))
  1318. if not consumer_tag:
  1319. # Need a consumer tag to register consumer info before sending
  1320. # request to broker, because I/O might dispatch incoming messages
  1321. # immediately following Basic.Consume-ok before _flush_output
  1322. # returns
  1323. consumer_tag = self._impl._generate_consumer_tag()
  1324. if consumer_tag in self._consumer_infos:
  1325. raise exceptions.DuplicateConsumerTag(consumer_tag)
  1326. # Create new consumer
  1327. self._consumer_infos[consumer_tag] = _ConsumerInfo(
  1328. consumer_tag,
  1329. auto_ack=auto_ack,
  1330. on_message_callback=on_message_callback,
  1331. alternate_event_sink=alternate_event_sink)
  1332. try:
  1333. with self._basic_consume_ok_result as ok_result:
  1334. tag = self._impl.basic_consume(
  1335. on_message_callback=self._on_consumer_message_delivery,
  1336. queue=queue,
  1337. auto_ack=auto_ack,
  1338. exclusive=exclusive,
  1339. consumer_tag=consumer_tag,
  1340. arguments=arguments)
  1341. assert tag == consumer_tag, (tag, consumer_tag)
  1342. self._flush_output(ok_result.is_ready)
  1343. except Exception:
  1344. # If channel was closed, self._consumer_infos will be empty
  1345. if consumer_tag in self._consumer_infos:
  1346. del self._consumer_infos[consumer_tag]
  1347. # Schedule termination of connection.process_data_events using a
  1348. # negative channel number
  1349. self.connection._request_channel_dispatch(-self.channel_number)
  1350. raise
  1351. # NOTE: Consumer could get cancelled by broker immediately after opening
  1352. # (e.g., queue getting deleted externally)
  1353. if self._consumer_infos[consumer_tag].setting_up:
  1354. self._consumer_infos[consumer_tag].state = _ConsumerInfo.ACTIVE
  1355. return consumer_tag
  1356. def basic_cancel(self, consumer_tag):
  1357. """This method cancels a consumer. This does not affect already
  1358. delivered messages, but it does mean the server will not send any more
  1359. messages for that consumer. The client may receive an arbitrary number
  1360. of messages in between sending the cancel method and receiving the
  1361. cancel-ok reply.
  1362. NOTE: When cancelling an auto_ack=False consumer, this implementation
  1363. automatically Nacks and suppresses any incoming messages that have not
  1364. yet been dispatched to the consumer's callback. However, when cancelling
  1365. a auto_ack=True consumer, this method will return any pending messages
  1366. that arrived before broker confirmed the cancellation.
  1367. :param str consumer_tag: Identifier for the consumer; the result of
  1368. passing a consumer_tag that was created on another channel is
  1369. undefined (bad things will happen)
  1370. :returns: (NEW IN pika 0.10.0) empty sequence for a auto_ack=False
  1371. consumer; for a auto_ack=True consumer, returns a (possibly empty)
  1372. sequence of pending messages that arrived before broker confirmed
  1373. the cancellation (this is done instead of via consumer's callback in
  1374. order to prevent reentrancy/recursion. Each message is four-tuple:
  1375. (channel, method, properties, body)
  1376. channel: BlockingChannel
  1377. method: spec.Basic.Deliver
  1378. properties: spec.BasicProperties
  1379. body: bytes
  1380. :rtype: list
  1381. """
  1382. try:
  1383. consumer_info = self._consumer_infos[consumer_tag]
  1384. except KeyError:
  1385. LOGGER.warning(
  1386. "User is attempting to cancel an unknown consumer=%s; "
  1387. "already cancelled by user or broker?", consumer_tag)
  1388. return []
  1389. try:
  1390. # Assertion failure here is most likely due to reentrance
  1391. assert consumer_info.active or consumer_info.cancelled_by_broker, (
  1392. consumer_info.state)
  1393. # Assertion failure here signals disconnect between consumer state
  1394. # in BlockingChannel and Channel
  1395. assert (consumer_info.cancelled_by_broker or
  1396. consumer_tag in self._impl._consumers), consumer_tag
  1397. auto_ack = consumer_info.auto_ack
  1398. consumer_info.state = _ConsumerInfo.TEARING_DOWN
  1399. with _CallbackResult() as cancel_ok_result:
  1400. # Nack pending messages for auto_ack=False consumer
  1401. if not auto_ack:
  1402. pending_messages = self._remove_pending_deliveries(
  1403. consumer_tag)
  1404. if pending_messages:
  1405. # NOTE: we use impl's basic_reject to avoid the
  1406. # possibility of redelivery before basic_cancel takes
  1407. # control of nacking.
  1408. # NOTE: we can't use basic_nack with the multiple option
  1409. # to avoid nacking messages already held by our client.
  1410. for message in pending_messages:
  1411. self._impl.basic_reject(
  1412. message.method.delivery_tag, requeue=True)
  1413. # Cancel the consumer; impl takes care of rejecting any
  1414. # additional deliveries that arrive for a auto_ack=False
  1415. # consumer
  1416. self._impl.basic_cancel(
  1417. consumer_tag=consumer_tag,
  1418. callback=cancel_ok_result.signal_once)
  1419. # Flush output and wait for Basic.Cancel-ok or
  1420. # broker-initiated Basic.Cancel
  1421. self._flush_output(
  1422. cancel_ok_result.is_ready,
  1423. lambda: consumer_tag not in self._impl._consumers)
  1424. if auto_ack:
  1425. # Return pending messages for auto_ack=True consumer
  1426. return [(evt.method, evt.properties, evt.body)
  1427. for evt in self._remove_pending_deliveries(consumer_tag)
  1428. ]
  1429. else:
  1430. # impl takes care of rejecting any incoming deliveries during
  1431. # cancellation
  1432. messages = self._remove_pending_deliveries(consumer_tag)
  1433. assert not messages, messages
  1434. return []
  1435. finally:
  1436. # NOTE: The entry could be purged if channel or connection closes
  1437. if consumer_tag in self._consumer_infos:
  1438. del self._consumer_infos[consumer_tag]
  1439. # Schedule termination of connection.process_data_events using a
  1440. # negative channel number
  1441. self.connection._request_channel_dispatch(-self.channel_number)
  1442. def _remove_pending_deliveries(self, consumer_tag):
  1443. """Extract _ConsumerDeliveryEvt objects destined for the given consumer
  1444. from pending events, discarding the _ConsumerCancellationEvt, if any
  1445. :param str consumer_tag:
  1446. :returns: a (possibly empty) sequence of _ConsumerDeliveryEvt destined
  1447. for the given consumer tag
  1448. :rtype: list
  1449. """
  1450. remaining_events = deque()
  1451. unprocessed_messages = []
  1452. while self._pending_events:
  1453. evt = self._pending_events.popleft()
  1454. if type(evt) is _ConsumerDeliveryEvt: # pylint: disable=C0123
  1455. if evt.method.consumer_tag == consumer_tag:
  1456. unprocessed_messages.append(evt)
  1457. continue
  1458. if type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123
  1459. if evt.method_frame.method.consumer_tag == consumer_tag:
  1460. # A broker-initiated Basic.Cancel must have arrived
  1461. # before our cancel request completed
  1462. continue
  1463. remaining_events.append(evt)
  1464. self._pending_events = remaining_events
  1465. return unprocessed_messages
  1466. def start_consuming(self):
  1467. """Processes I/O events and dispatches timers and `basic_consume`
  1468. callbacks until all consumers are cancelled.
  1469. NOTE: this blocking function may not be called from the scope of a
  1470. pika callback, because dispatching `basic_consume` callbacks from this
  1471. context would constitute recursion.
  1472. :raises pika.exceptions.ReentrancyError: if called from the scope of a
  1473. `BlockingConnection` or `BlockingChannel` callback
  1474. :raises ChannelClosed: when this channel is closed by broker.
  1475. """
  1476. # Check if called from the scope of an event dispatch callback
  1477. with self.connection._acquire_event_dispatch() as dispatch_allowed:
  1478. if not dispatch_allowed:
  1479. raise exceptions.ReentrancyError(
  1480. 'start_consuming may not be called from the scope of '
  1481. 'another BlockingConnection or BlockingChannel callback')
  1482. self._impl._raise_if_not_open()
  1483. # Process events as long as consumers exist on this channel
  1484. while self._consumer_infos:
  1485. # This will raise ChannelClosed if channel is closed by broker
  1486. self._process_data_events(time_limit=None)
  1487. def stop_consuming(self, consumer_tag=None):
  1488. """ Cancels all consumers, signalling the `start_consuming` loop to
  1489. exit.
  1490. NOTE: pending non-ackable messages will be lost; pending ackable
  1491. messages will be rejected.
  1492. """
  1493. if consumer_tag:
  1494. self.basic_cancel(consumer_tag)
  1495. else:
  1496. self._cancel_all_consumers()
  1497. def consume(self,
  1498. queue,
  1499. auto_ack=False,
  1500. exclusive=False,
  1501. arguments=None,
  1502. inactivity_timeout=None):
  1503. """Blocking consumption of a queue instead of via a callback. This
  1504. method is a generator that yields each message as a tuple of method,
  1505. properties, and body. The active generator iterator terminates when the
  1506. consumer is cancelled by client via `BlockingChannel.cancel()` or by
  1507. broker.
  1508. Example:
  1509. for method, properties, body in channel.consume('queue'):
  1510. print body
  1511. channel.basic_ack(method.delivery_tag)
  1512. You should call `BlockingChannel.cancel()` when you escape out of the
  1513. generator loop.
  1514. If you don't cancel this consumer, then next call on the same channel
  1515. to `consume()` with the exact same (queue, auto_ack, exclusive) parameters
  1516. will resume the existing consumer generator; however, calling with
  1517. different parameters will result in an exception.
  1518. :param str queue: The queue name to consume
  1519. :param bool auto_ack: Tell the broker to not expect a ack/nack response
  1520. :param bool exclusive: Don't allow other consumers on the queue
  1521. :param dict arguments: Custom key/value pair arguments for the consumer
  1522. :param float inactivity_timeout: if a number is given (in
  1523. seconds), will cause the method to yield (None, None, None) after the
  1524. given period of inactivity; this permits for pseudo-regular maintenance
  1525. activities to be carried out by the user while waiting for messages
  1526. to arrive. If None is given (default), then the method blocks until
  1527. the next event arrives. NOTE that timing granularity is limited by
  1528. the timer resolution of the underlying implementation.
  1529. NEW in pika 0.10.0.
  1530. :yields: tuple(spec.Basic.Deliver, spec.BasicProperties, str or unicode)
  1531. :raises ValueError: if consumer-creation parameters don't match those
  1532. of the existing queue consumer generator, if any.
  1533. NEW in pika 0.10.0
  1534. :raises ChannelClosed: when this channel is closed by broker.
  1535. """
  1536. self._impl._raise_if_not_open()
  1537. params = (queue, auto_ack, exclusive)
  1538. if self._queue_consumer_generator is not None:
  1539. if params != self._queue_consumer_generator.params:
  1540. raise ValueError(
  1541. 'Consume with different params not allowed on existing '
  1542. 'queue consumer generator; previous params: %r; '
  1543. 'new params: %r' % (self._queue_consumer_generator.params,
  1544. (queue, auto_ack, exclusive)))
  1545. else:
  1546. LOGGER.debug('Creating new queue consumer generator; params: %r',
  1547. params)
  1548. # Need a consumer tag to register consumer info before sending
  1549. # request to broker, because I/O might pick up incoming messages
  1550. # in addition to Basic.Consume-ok
  1551. consumer_tag = self._impl._generate_consumer_tag()
  1552. self._queue_consumer_generator = _QueueConsumerGeneratorInfo(
  1553. params, consumer_tag)
  1554. try:
  1555. self._basic_consume_impl(
  1556. queue=queue,
  1557. auto_ack=auto_ack,
  1558. exclusive=exclusive,
  1559. consumer_tag=consumer_tag,
  1560. arguments=arguments,
  1561. alternate_event_sink=self._on_consumer_generator_event)
  1562. except Exception:
  1563. self._queue_consumer_generator = None
  1564. raise
  1565. LOGGER.info('Created new queue consumer generator %r',
  1566. self._queue_consumer_generator)
  1567. while self._queue_consumer_generator is not None:
  1568. # Process pending events
  1569. if self._queue_consumer_generator.pending_events:
  1570. evt = self._queue_consumer_generator.pending_events.popleft()
  1571. if type(evt) is _ConsumerCancellationEvt: # pylint: disable=C0123
  1572. # Consumer was cancelled by broker
  1573. self._queue_consumer_generator = None
  1574. break
  1575. else:
  1576. yield (evt.method, evt.properties, evt.body)
  1577. continue
  1578. if inactivity_timeout is None:
  1579. # Wait indefinitely for a message to arrive, while processing
  1580. # I/O events and triggering ChannelClosed exception when the
  1581. # channel fails
  1582. self._process_data_events(time_limit=None)
  1583. continue
  1584. # Wait with inactivity timeout
  1585. wait_start_time = compat.time_now()
  1586. wait_deadline = wait_start_time + inactivity_timeout
  1587. delta = inactivity_timeout
  1588. while (self._queue_consumer_generator is not None and
  1589. not self._queue_consumer_generator.pending_events):
  1590. self._process_data_events(time_limit=delta)
  1591. if not self._queue_consumer_generator:
  1592. # Consumer was cancelled by client
  1593. break
  1594. if self._queue_consumer_generator.pending_events:
  1595. # Got message(s)
  1596. break
  1597. delta = wait_deadline - compat.time_now()
  1598. if delta <= 0.0:
  1599. # Signal inactivity timeout
  1600. yield (None, None, None)
  1601. break
  1602. def _process_data_events(self, time_limit):
  1603. """Wrapper for `BlockingConnection.process_data_events()` with common
  1604. channel-specific logic that raises ChannelClosed if broker closed this
  1605. channel.
  1606. NOTE: We need to raise an exception in the context of user's call into
  1607. our API to protect the integrity of the underlying implementation.
  1608. BlockingConnection's underlying asynchronous connection adapter
  1609. (SelectConnection) uses callbacks to communicate with us. If
  1610. BlockingConnection leaks exceptions back into the I/O loop or the
  1611. asynchronous connection adapter, we interrupt their normal workflow and
  1612. introduce a high likelihood of state inconsistency.
  1613. See `BlockingConnection.process_data_events()` for documentation of args
  1614. and behavior.
  1615. :param float time_limit:
  1616. """
  1617. self.connection.process_data_events(time_limit=time_limit)
  1618. if self.is_closed and isinstance(self._closing_reason,
  1619. exceptions.ChannelClosedByBroker):
  1620. LOGGER.debug('Channel close by broker detected, raising %r; %r',
  1621. self._closing_reason, self)
  1622. raise self._closing_reason # pylint: disable=E0702
  1623. def get_waiting_message_count(self):
  1624. """Returns the number of messages that may be retrieved from the current
  1625. queue consumer generator via `BlockingChannel.consume` without blocking.
  1626. NEW in pika 0.10.0
  1627. :returns: The number of waiting messages
  1628. :rtype: int
  1629. """
  1630. if self._queue_consumer_generator is not None:
  1631. pending_events = self._queue_consumer_generator.pending_events
  1632. count = len(pending_events)
  1633. if count and type(pending_events[-1]) is _ConsumerCancellationEvt: # pylint: disable=C0123
  1634. count -= 1
  1635. else:
  1636. count = 0
  1637. return count
  1638. def cancel(self):
  1639. """Cancel the queue consumer created by `BlockingChannel.consume`,
  1640. rejecting all pending ackable messages.
  1641. NOTE: If you're looking to cancel a consumer issued with
  1642. BlockingChannel.basic_consume then you should call
  1643. BlockingChannel.basic_cancel.
  1644. :returns: The number of messages requeued by Basic.Nack.
  1645. NEW in 0.10.0: returns 0
  1646. :rtype: int
  1647. """
  1648. if self._queue_consumer_generator is None:
  1649. LOGGER.warning('cancel: queue consumer generator is inactive '
  1650. '(already cancelled by client or broker?)')
  1651. return 0
  1652. try:
  1653. _, auto_ack, _ = self._queue_consumer_generator.params
  1654. if not auto_ack:
  1655. # Reject messages held by queue consumer generator; NOTE: we
  1656. # can't use basic_nack with the multiple option to avoid nacking
  1657. # messages already held by our client.
  1658. pending_events = self._queue_consumer_generator.pending_events
  1659. # NOTE `get_waiting_message_count` adjusts for `Basic.Cancel`
  1660. # from the server at the end (if any)
  1661. for _ in compat.xrange(self.get_waiting_message_count()):
  1662. evt = pending_events.popleft()
  1663. self._impl.basic_reject(
  1664. evt.method.delivery_tag, requeue=True)
  1665. self.basic_cancel(self._queue_consumer_generator.consumer_tag)
  1666. finally:
  1667. self._queue_consumer_generator = None
  1668. # Return 0 for compatibility with legacy implementation; the number of
  1669. # nacked messages is not meaningful since only messages consumed with
  1670. # auto_ack=False may be nacked, and those arriving after calling
  1671. # basic_cancel will be rejected automatically by impl channel, so we'll
  1672. # never know how many of those were nacked.
  1673. return 0
  1674. def basic_ack(self, delivery_tag=0, multiple=False):
  1675. """Acknowledge one or more messages. When sent by the client, this
  1676. method acknowledges one or more messages delivered via the Deliver or
  1677. Get-Ok methods. When sent by server, this method acknowledges one or
  1678. more messages published with the Publish method on a channel in
  1679. confirm mode. The acknowledgement can be for a single message or a
  1680. set of messages up to and including a specific message.
  1681. :param int delivery-tag: The server-assigned delivery tag
  1682. :param bool multiple: If set to True, the delivery tag is treated as
  1683. "up to and including", so that multiple messages
  1684. can be acknowledged with a single method. If set
  1685. to False, the delivery tag refers to a single
  1686. message. If the multiple field is 1, and the
  1687. delivery tag is zero, this indicates
  1688. acknowledgement of all outstanding messages.
  1689. """
  1690. self._impl.basic_ack(delivery_tag=delivery_tag, multiple=multiple)
  1691. self._flush_output()
  1692. def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
  1693. """This method allows a client to reject one or more incoming messages.
  1694. It can be used to interrupt and cancel large incoming messages, or
  1695. return untreatable messages to their original queue.
  1696. :param int delivery-tag: The server-assigned delivery tag
  1697. :param bool multiple: If set to True, the delivery tag is treated as
  1698. "up to and including", so that multiple messages
  1699. can be acknowledged with a single method. If set
  1700. to False, the delivery tag refers to a single
  1701. message. If the multiple field is 1, and the
  1702. delivery tag is zero, this indicates
  1703. acknowledgement of all outstanding messages.
  1704. :param bool requeue: If requeue is true, the server will attempt to
  1705. requeue the message. If requeue is false or the
  1706. requeue attempt fails the messages are discarded or
  1707. dead-lettered.
  1708. """
  1709. self._impl.basic_nack(
  1710. delivery_tag=delivery_tag, multiple=multiple, requeue=requeue)
  1711. self._flush_output()
  1712. def basic_get(self, queue, auto_ack=False):
  1713. """Get a single message from the AMQP broker. Returns a sequence with
  1714. the method frame, message properties, and body.
  1715. :param str queue: Name of queue from which to get a message
  1716. :param bool auto_ack: Tell the broker to not expect a reply
  1717. :returns: a three-tuple; (None, None, None) if the queue was empty;
  1718. otherwise (method, properties, body); NOTE: body may be None
  1719. :rtype: (spec.Basic.GetOk|None, spec.BasicProperties|None, str|None)
  1720. """
  1721. assert not self._basic_getempty_result
  1722. validators.require_string(queue, 'queue')
  1723. # NOTE: nested with for python 2.6 compatibility
  1724. with _CallbackResult(self._RxMessageArgs) as get_ok_result:
  1725. with self._basic_getempty_result:
  1726. self._impl.basic_get(
  1727. queue=queue,
  1728. auto_ack=auto_ack,
  1729. callback=get_ok_result.set_value_once)
  1730. self._flush_output(get_ok_result.is_ready,
  1731. self._basic_getempty_result.is_ready)
  1732. if get_ok_result:
  1733. evt = get_ok_result.value
  1734. return evt.method, evt.properties, evt.body
  1735. else:
  1736. assert self._basic_getempty_result, (
  1737. "wait completed without GetOk and GetEmpty")
  1738. return None, None, None
  1739. def basic_publish(self,
  1740. exchange,
  1741. routing_key,
  1742. body,
  1743. properties=None,
  1744. mandatory=False):
  1745. """Publish to the channel with the given exchange, routing key, and
  1746. body.
  1747. For more information on basic_publish and what the parameters do, see:
  1748. http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
  1749. NOTE: mandatory may be enabled even without delivery
  1750. confirmation, but in the absence of delivery confirmation the
  1751. synchronous implementation has no way to know how long to wait for
  1752. the Basic.Return.
  1753. :param str exchange: The exchange to publish to
  1754. :param str routing_key: The routing key to bind on
  1755. :param bytes body: The message body; empty string if no body
  1756. :param pika.spec.BasicProperties properties: message properties
  1757. :param bool mandatory: The mandatory flag
  1758. :raises UnroutableError: raised when a message published in
  1759. publisher-acknowledgments mode (see
  1760. `BlockingChannel.confirm_delivery`) is returned via `Basic.Return`
  1761. followed by `Basic.Ack`.
  1762. :raises NackError: raised when a message published in
  1763. publisher-acknowledgements mode is Nack'ed by the broker. See
  1764. `BlockingChannel.confirm_delivery`.
  1765. """
  1766. if self._delivery_confirmation:
  1767. # In publisher-acknowledgments mode
  1768. with self._message_confirmation_result:
  1769. self._impl.basic_publish(
  1770. exchange=exchange,
  1771. routing_key=routing_key,
  1772. body=body,
  1773. properties=properties,
  1774. mandatory=mandatory)
  1775. self._flush_output(self._message_confirmation_result.is_ready)
  1776. conf_method = (
  1777. self._message_confirmation_result.value.method_frame.method)
  1778. if isinstance(conf_method, pika.spec.Basic.Nack):
  1779. # Broker was unable to process the message due to internal
  1780. # error
  1781. LOGGER.warning(
  1782. "Message was Nack'ed by broker: nack=%r; channel=%s; "
  1783. "exchange=%s; routing_key=%s; mandatory=%r; ",
  1784. conf_method, self.channel_number, exchange, routing_key,
  1785. mandatory)
  1786. if self._puback_return is not None:
  1787. returned_messages = [self._puback_return]
  1788. self._puback_return = None
  1789. else:
  1790. returned_messages = []
  1791. raise exceptions.NackError(returned_messages)
  1792. else:
  1793. assert isinstance(conf_method,
  1794. pika.spec.Basic.Ack), (conf_method)
  1795. if self._puback_return is not None:
  1796. # Unroutable message was returned
  1797. messages = [self._puback_return]
  1798. self._puback_return = None
  1799. raise exceptions.UnroutableError(messages)
  1800. else:
  1801. # In non-publisher-acknowledgments mode
  1802. self._impl.basic_publish(
  1803. exchange=exchange,
  1804. routing_key=routing_key,
  1805. body=body,
  1806. properties=properties,
  1807. mandatory=mandatory)
  1808. self._flush_output()
  1809. def basic_qos(self, prefetch_size=0, prefetch_count=0, global_qos=False):
  1810. """Specify quality of service. This method requests a specific quality
  1811. of service. The QoS can be specified for the current channel or for all
  1812. channels on the connection. The client can request that messages be sent
  1813. in advance so that when the client finishes processing a message, the
  1814. following message is already held locally, rather than needing to be
  1815. sent down the channel. Prefetching gives a performance improvement.
  1816. :param int prefetch_size: This field specifies the prefetch window
  1817. size. The server will send a message in
  1818. advance if it is equal to or smaller in size
  1819. than the available prefetch size (and also
  1820. falls into other prefetch limits). May be set
  1821. to zero, meaning "no specific limit",
  1822. although other prefetch limits may still
  1823. apply. The prefetch-size is ignored if the
  1824. no-ack option is set in the consumer.
  1825. :param int prefetch_count: Specifies a prefetch window in terms of whole
  1826. messages. This field may be used in
  1827. combination with the prefetch-size field; a
  1828. message will only be sent in advance if both
  1829. prefetch windows (and those at the channel
  1830. and connection level) allow it. The
  1831. prefetch-count is ignored if the no-ack
  1832. option is set in the consumer.
  1833. :param bool global_qos: Should the QoS apply to all consumers on the
  1834. Channel
  1835. """
  1836. with _CallbackResult() as qos_ok_result:
  1837. self._impl.basic_qos(
  1838. callback=qos_ok_result.signal_once,
  1839. prefetch_size=prefetch_size,
  1840. prefetch_count=prefetch_count,
  1841. global_qos=global_qos)
  1842. self._flush_output(qos_ok_result.is_ready)
  1843. def basic_recover(self, requeue=False):
  1844. """This method asks the server to redeliver all unacknowledged messages
  1845. on a specified channel. Zero or more messages may be redelivered. This
  1846. method replaces the asynchronous Recover.
  1847. :param bool requeue: If False, the message will be redelivered to the
  1848. original recipient. If True, the server will
  1849. attempt to requeue the message, potentially then
  1850. delivering it to an alternative subscriber.
  1851. """
  1852. with _CallbackResult() as recover_ok_result:
  1853. self._impl.basic_recover(
  1854. requeue=requeue, callback=recover_ok_result.signal_once)
  1855. self._flush_output(recover_ok_result.is_ready)
  1856. def basic_reject(self, delivery_tag=None, requeue=True):
  1857. """Reject an incoming message. This method allows a client to reject a
  1858. message. It can be used to interrupt and cancel large incoming messages,
  1859. or return untreatable messages to their original queue.
  1860. :param int delivery-tag: The server-assigned delivery tag
  1861. :param bool requeue: If requeue is true, the server will attempt to
  1862. requeue the message. If requeue is false or the
  1863. requeue attempt fails the messages are discarded or
  1864. dead-lettered.
  1865. """
  1866. self._impl.basic_reject(delivery_tag=delivery_tag, requeue=requeue)
  1867. self._flush_output()
  1868. def confirm_delivery(self):
  1869. """Turn on RabbitMQ-proprietary Confirm mode in the channel.
  1870. For more information see:
  1871. https://www.rabbitmq.com/confirms.html
  1872. """
  1873. if self._delivery_confirmation:
  1874. LOGGER.error(
  1875. 'confirm_delivery: confirmation was already enabled '
  1876. 'on channel=%s', self.channel_number)
  1877. return
  1878. with _CallbackResult() as select_ok_result:
  1879. self._impl.confirm_delivery(
  1880. ack_nack_callback=self._message_confirmation_result.
  1881. set_value_once,
  1882. callback=select_ok_result.signal_once)
  1883. self._flush_output(select_ok_result.is_ready)
  1884. self._delivery_confirmation = True
  1885. # Unroutable messages returned after this point will be in the context
  1886. # of publisher acknowledgments
  1887. self._impl.add_on_return_callback(self._on_puback_message_returned)
  1888. def exchange_declare(self,
  1889. exchange,
  1890. exchange_type='direct',
  1891. passive=False,
  1892. durable=False,
  1893. auto_delete=False,
  1894. internal=False,
  1895. arguments=None):
  1896. """This method creates an exchange if it does not already exist, and if
  1897. the exchange exists, verifies that it is of the correct and expected
  1898. class.
  1899. If passive set, the server will reply with Declare-Ok if the exchange
  1900. already exists with the same name, and raise an error if not and if the
  1901. exchange does not already exist, the server MUST raise a channel
  1902. exception with reply code 404 (not found).
  1903. :param str exchange: The exchange name consists of a non-empty sequence of
  1904. these characters: letters, digits, hyphen, underscore,
  1905. period, or colon.
  1906. :param str exchange_type: The exchange type to use
  1907. :param bool passive: Perform a declare or just check to see if it exists
  1908. :param bool durable: Survive a reboot of RabbitMQ
  1909. :param bool auto_delete: Remove when no more queues are bound to it
  1910. :param bool internal: Can only be published to by other exchanges
  1911. :param dict arguments: Custom key/value pair arguments for the exchange
  1912. :returns: Method frame from the Exchange.Declare-ok response
  1913. :rtype: `pika.frame.Method` having `method` attribute of type
  1914. `spec.Exchange.DeclareOk`
  1915. """
  1916. validators.require_string(exchange, 'exchange')
  1917. with _CallbackResult(
  1918. self._MethodFrameCallbackResultArgs) as declare_ok_result:
  1919. self._impl.exchange_declare(
  1920. exchange=exchange,
  1921. exchange_type=exchange_type,
  1922. passive=passive,
  1923. durable=durable,
  1924. auto_delete=auto_delete,
  1925. internal=internal,
  1926. arguments=arguments,
  1927. callback=declare_ok_result.set_value_once)
  1928. self._flush_output(declare_ok_result.is_ready)
  1929. return declare_ok_result.value.method_frame
  1930. def exchange_delete(self, exchange=None, if_unused=False):
  1931. """Delete the exchange.
  1932. :param str exchange: The exchange name
  1933. :param bool if_unused: only delete if the exchange is unused
  1934. :returns: Method frame from the Exchange.Delete-ok response
  1935. :rtype: `pika.frame.Method` having `method` attribute of type
  1936. `spec.Exchange.DeleteOk`
  1937. """
  1938. with _CallbackResult(
  1939. self._MethodFrameCallbackResultArgs) as delete_ok_result:
  1940. self._impl.exchange_delete(
  1941. exchange=exchange,
  1942. if_unused=if_unused,
  1943. callback=delete_ok_result.set_value_once)
  1944. self._flush_output(delete_ok_result.is_ready)
  1945. return delete_ok_result.value.method_frame
  1946. def exchange_bind(self, destination, source, routing_key='',
  1947. arguments=None):
  1948. """Bind an exchange to another exchange.
  1949. :param str destination: The destination exchange to bind
  1950. :param str source: The source exchange to bind to
  1951. :param str routing_key: The routing key to bind on
  1952. :param dict arguments: Custom key/value pair arguments for the binding
  1953. :returns: Method frame from the Exchange.Bind-ok response
  1954. :rtype: `pika.frame.Method` having `method` attribute of type
  1955. `spec.Exchange.BindOk`
  1956. """
  1957. validators.require_string(destination, 'destination')
  1958. validators.require_string(source, 'source')
  1959. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  1960. bind_ok_result:
  1961. self._impl.exchange_bind(
  1962. destination=destination,
  1963. source=source,
  1964. routing_key=routing_key,
  1965. arguments=arguments,
  1966. callback=bind_ok_result.set_value_once)
  1967. self._flush_output(bind_ok_result.is_ready)
  1968. return bind_ok_result.value.method_frame
  1969. def exchange_unbind(self,
  1970. destination=None,
  1971. source=None,
  1972. routing_key='',
  1973. arguments=None):
  1974. """Unbind an exchange from another exchange.
  1975. :param str destination: The destination exchange to unbind
  1976. :param str source: The source exchange to unbind from
  1977. :param str routing_key: The routing key to unbind
  1978. :param dict arguments: Custom key/value pair arguments for the binding
  1979. :returns: Method frame from the Exchange.Unbind-ok response
  1980. :rtype: `pika.frame.Method` having `method` attribute of type
  1981. `spec.Exchange.UnbindOk`
  1982. """
  1983. with _CallbackResult(
  1984. self._MethodFrameCallbackResultArgs) as unbind_ok_result:
  1985. self._impl.exchange_unbind(
  1986. destination=destination,
  1987. source=source,
  1988. routing_key=routing_key,
  1989. arguments=arguments,
  1990. callback=unbind_ok_result.set_value_once)
  1991. self._flush_output(unbind_ok_result.is_ready)
  1992. return unbind_ok_result.value.method_frame
  1993. def queue_declare(self,
  1994. queue,
  1995. passive=False,
  1996. durable=False,
  1997. exclusive=False,
  1998. auto_delete=False,
  1999. arguments=None):
  2000. """Declare queue, create if needed. This method creates or checks a
  2001. queue. When creating a new queue the client can specify various
  2002. properties that control the durability of the queue and its contents,
  2003. and the level of sharing for the queue.
  2004. Use an empty string as the queue name for the broker to auto-generate
  2005. one. Retrieve this auto-generated queue name from the returned
  2006. `spec.Queue.DeclareOk` method frame.
  2007. :param str queue: The queue name; if empty string, the broker will
  2008. create a unique queue name
  2009. :param bool passive: Only check to see if the queue exists and raise
  2010. `ChannelClosed` if it doesn't
  2011. :param bool durable: Survive reboots of the broker
  2012. :param bool exclusive: Only allow access by the current connection
  2013. :param bool auto_delete: Delete after consumer cancels or disconnects
  2014. :param dict arguments: Custom key/value arguments for the queue
  2015. :returns: Method frame from the Queue.Declare-ok response
  2016. :rtype: `pika.frame.Method` having `method` attribute of type
  2017. `spec.Queue.DeclareOk`
  2018. """
  2019. validators.require_string(queue, 'queue')
  2020. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2021. declare_ok_result:
  2022. self._impl.queue_declare(
  2023. queue=queue,
  2024. passive=passive,
  2025. durable=durable,
  2026. exclusive=exclusive,
  2027. auto_delete=auto_delete,
  2028. arguments=arguments,
  2029. callback=declare_ok_result.set_value_once)
  2030. self._flush_output(declare_ok_result.is_ready)
  2031. return declare_ok_result.value.method_frame
  2032. def queue_delete(self, queue, if_unused=False, if_empty=False):
  2033. """Delete a queue from the broker.
  2034. :param str queue: The queue to delete
  2035. :param bool if_unused: only delete if it's unused
  2036. :param bool if_empty: only delete if the queue is empty
  2037. :returns: Method frame from the Queue.Delete-ok response
  2038. :rtype: `pika.frame.Method` having `method` attribute of type
  2039. `spec.Queue.DeleteOk`
  2040. """
  2041. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2042. delete_ok_result:
  2043. self._impl.queue_delete(
  2044. queue=queue,
  2045. if_unused=if_unused,
  2046. if_empty=if_empty,
  2047. callback=delete_ok_result.set_value_once)
  2048. self._flush_output(delete_ok_result.is_ready)
  2049. return delete_ok_result.value.method_frame
  2050. def queue_purge(self, queue):
  2051. """Purge all of the messages from the specified queue
  2052. :param str queue: The queue to purge
  2053. :returns: Method frame from the Queue.Purge-ok response
  2054. :rtype: `pika.frame.Method` having `method` attribute of type
  2055. `spec.Queue.PurgeOk`
  2056. """
  2057. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2058. purge_ok_result:
  2059. self._impl.queue_purge(
  2060. queue=queue, callback=purge_ok_result.set_value_once)
  2061. self._flush_output(purge_ok_result.is_ready)
  2062. return purge_ok_result.value.method_frame
  2063. def queue_bind(self, queue, exchange, routing_key=None, arguments=None):
  2064. """Bind the queue to the specified exchange
  2065. :param str queue: The queue to bind to the exchange
  2066. :param str exchange: The source exchange to bind to
  2067. :param str routing_key: The routing key to bind on
  2068. :param dict arguments: Custom key/value pair arguments for the binding
  2069. :returns: Method frame from the Queue.Bind-ok response
  2070. :rtype: `pika.frame.Method` having `method` attribute of type
  2071. `spec.Queue.BindOk`
  2072. """
  2073. validators.require_string(queue, 'queue')
  2074. validators.require_string(exchange, 'exchange')
  2075. with _CallbackResult(
  2076. self._MethodFrameCallbackResultArgs) as bind_ok_result:
  2077. self._impl.queue_bind(
  2078. queue=queue,
  2079. exchange=exchange,
  2080. routing_key=routing_key,
  2081. arguments=arguments,
  2082. callback=bind_ok_result.set_value_once)
  2083. self._flush_output(bind_ok_result.is_ready)
  2084. return bind_ok_result.value.method_frame
  2085. def queue_unbind(self,
  2086. queue,
  2087. exchange=None,
  2088. routing_key=None,
  2089. arguments=None):
  2090. """Unbind a queue from an exchange.
  2091. :param str queue: The queue to unbind from the exchange
  2092. :param str exchange: The source exchange to bind from
  2093. :param str routing_key: The routing key to unbind
  2094. :param dict arguments: Custom key/value pair arguments for the binding
  2095. :returns: Method frame from the Queue.Unbind-ok response
  2096. :rtype: `pika.frame.Method` having `method` attribute of type
  2097. `spec.Queue.UnbindOk`
  2098. """
  2099. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2100. unbind_ok_result:
  2101. self._impl.queue_unbind(
  2102. queue=queue,
  2103. exchange=exchange,
  2104. routing_key=routing_key,
  2105. arguments=arguments,
  2106. callback=unbind_ok_result.set_value_once)
  2107. self._flush_output(unbind_ok_result.is_ready)
  2108. return unbind_ok_result.value.method_frame
  2109. def tx_select(self):
  2110. """Select standard transaction mode. This method sets the channel to use
  2111. standard transactions. The client must use this method at least once on
  2112. a channel before using the Commit or Rollback methods.
  2113. :returns: Method frame from the Tx.Select-ok response
  2114. :rtype: `pika.frame.Method` having `method` attribute of type
  2115. `spec.Tx.SelectOk`
  2116. """
  2117. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2118. select_ok_result:
  2119. self._impl.tx_select(select_ok_result.set_value_once)
  2120. self._flush_output(select_ok_result.is_ready)
  2121. return select_ok_result.value.method_frame
  2122. def tx_commit(self):
  2123. """Commit a transaction.
  2124. :returns: Method frame from the Tx.Commit-ok response
  2125. :rtype: `pika.frame.Method` having `method` attribute of type
  2126. `spec.Tx.CommitOk`
  2127. """
  2128. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2129. commit_ok_result:
  2130. self._impl.tx_commit(commit_ok_result.set_value_once)
  2131. self._flush_output(commit_ok_result.is_ready)
  2132. return commit_ok_result.value.method_frame
  2133. def tx_rollback(self):
  2134. """Rollback a transaction.
  2135. :returns: Method frame from the Tx.Commit-ok response
  2136. :rtype: `pika.frame.Method` having `method` attribute of type
  2137. `spec.Tx.CommitOk`
  2138. """
  2139. with _CallbackResult(self._MethodFrameCallbackResultArgs) as \
  2140. rollback_ok_result:
  2141. self._impl.tx_rollback(rollback_ok_result.set_value_once)
  2142. self._flush_output(rollback_ok_result.is_ready)
  2143. return rollback_ok_result.value.method_frame