channel.py 61 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501
  1. """The Channel class provides a wrapper for interacting with RabbitMQ
  2. implementing the methods and behaviors for an AMQP Channel.
  3. """
  4. # disable too-many-lines
  5. # pylint: disable=C0302
  6. import collections
  7. import logging
  8. import uuid
  9. import pika.frame as frame
  10. import pika.exceptions as exceptions
  11. import pika.spec as spec
  12. import pika.validators as validators
  13. from pika.compat import unicode_type, dictkeys, is_integer
  14. LOGGER = logging.getLogger(__name__)
  15. MAX_CHANNELS = 65535 # per AMQP 0.9.1 spec.
  16. class Channel(object):
  17. """A Channel is the primary communication method for interacting with
  18. RabbitMQ. It is recommended that you do not directly invoke
  19. the creation of a channel object in your application code but rather
  20. construct the a channel by calling the active connection's channel()
  21. method.
  22. """
  23. # Disable pylint messages concerning "method could be a function"
  24. # pylint: disable=R0201
  25. CLOSED = 0
  26. OPENING = 1
  27. OPEN = 2
  28. CLOSING = 3 # client-initiated close in progress
  29. _STATE_NAMES = {
  30. CLOSED: 'CLOSED',
  31. OPENING: 'OPENING',
  32. OPEN: 'OPEN',
  33. CLOSING: 'CLOSING'
  34. }
  35. _ON_CHANNEL_CLEANUP_CB_KEY = '_on_channel_cleanup'
  36. def __init__(self, connection, channel_number, on_open_callback):
  37. """Create a new instance of the Channel
  38. :param pika.connection.Connection connection: The connection
  39. :param int channel_number: The channel number for this instance
  40. :param callable on_open_callback: The callback to call on channel open.
  41. The callback will be invoked with the `Channel` instance as its only
  42. argument.
  43. """
  44. if not isinstance(channel_number, int):
  45. raise exceptions.InvalidChannelNumber
  46. validators.rpc_completion_callback(on_open_callback)
  47. self.channel_number = channel_number
  48. self.callbacks = connection.callbacks
  49. self.connection = connection
  50. # Initially, flow is assumed to be active
  51. self.flow_active = True
  52. self._content_assembler = ContentFrameAssembler()
  53. self._blocked = collections.deque(list())
  54. self._blocking = None
  55. self._has_on_flow_callback = False
  56. self._cancelled = set()
  57. self._consumers = dict()
  58. self._consumers_with_noack = set()
  59. self._on_flowok_callback = None
  60. self._on_getok_callback = None
  61. self._on_openok_callback = on_open_callback
  62. self._state = self.CLOSED
  63. # We save the closing reason exception to be passed to on-channel-close
  64. # callback at closing of the channel. Exception representing the closing
  65. # reason; ChannelClosedByClient or ChannelClosedByBroker on controlled
  66. # close; otherwise another exception describing the reason for failure
  67. # (most likely connection failure).
  68. self._closing_reason = None # type: None | Exception
  69. # opaque cookie value set by wrapper layer (e.g., BlockingConnection)
  70. # via _set_cookie
  71. self._cookie = None
  72. def __int__(self):
  73. """Return the channel object as its channel number
  74. :rtype: int
  75. """
  76. return self.channel_number
  77. def __repr__(self):
  78. return '<%s number=%s %s conn=%r>' % (
  79. self.__class__.__name__, self.channel_number,
  80. self._STATE_NAMES[self._state], self.connection)
  81. def add_callback(self, callback, replies, one_shot=True):
  82. """Pass in a callback handler and a list replies from the
  83. RabbitMQ broker which you'd like the callback notified of. Callbacks
  84. should allow for the frame parameter to be passed in.
  85. :param callable callback: The callback to call
  86. :param list replies: The replies to get a callback for
  87. :param bool one_shot: Only handle the first type callback
  88. """
  89. for reply in replies:
  90. self.callbacks.add(self.channel_number, reply, callback, one_shot)
  91. def add_on_cancel_callback(self, callback):
  92. """Pass a callback function that will be called when the basic_cancel
  93. is sent by the server. The callback function should receive a frame
  94. parameter.
  95. :param callable callback: The callback to call on Basic.Cancel from
  96. broker
  97. """
  98. self.callbacks.add(self.channel_number, spec.Basic.Cancel, callback,
  99. False)
  100. def add_on_close_callback(self, callback):
  101. """Pass a callback function that will be called when the channel is
  102. closed. The callback function will receive the channel and an exception
  103. describing why the channel was closed.
  104. If the channel is closed by broker via Channel.Close, the callback will
  105. receive `ChannelClosedByBroker` as the reason.
  106. If graceful user-initiated channel closing completes successfully (
  107. either directly of indirectly by closing a connection containing the
  108. channel) and closing concludes gracefully without Channel.Close from the
  109. broker and without loss of connection, the callback will receive
  110. `ChannelClosedByClient` exception as reason.
  111. If channel was closed due to loss of connection, the callback will
  112. receive another exception type describing the failure.
  113. :param callable callback: The callback, having the signature:
  114. callback(Channel, Exception reason)
  115. """
  116. self.callbacks.add(self.channel_number, '_on_channel_close', callback,
  117. False, self)
  118. def add_on_flow_callback(self, callback):
  119. """Pass a callback function that will be called when Channel.Flow is
  120. called by the remote server. Note that newer versions of RabbitMQ
  121. will not issue this but instead use TCP backpressure
  122. :param callable callback: The callback function
  123. """
  124. self._has_on_flow_callback = True
  125. self.callbacks.add(self.channel_number, spec.Channel.Flow, callback,
  126. False)
  127. def add_on_return_callback(self, callback):
  128. """Pass a callback function that will be called when basic_publish is
  129. sent a message that has been rejected and returned by the server.
  130. :param callable callback: The function to call, having the signature
  131. callback(channel, method, properties, body)
  132. where
  133. channel: pika.Channel
  134. method: pika.spec.Basic.Return
  135. properties: pika.spec.BasicProperties
  136. body: bytes
  137. """
  138. self.callbacks.add(self.channel_number, '_on_return', callback, False)
  139. def basic_ack(self, delivery_tag=0, multiple=False):
  140. """Acknowledge one or more messages. When sent by the client, this
  141. method acknowledges one or more messages delivered via the Deliver or
  142. Get-Ok methods. When sent by server, this method acknowledges one or
  143. more messages published with the Publish method on a channel in
  144. confirm mode. The acknowledgement can be for a single message or a
  145. set of messages up to and including a specific message.
  146. :param integer delivery_tag: int/long The server-assigned delivery tag
  147. :param bool multiple: If set to True, the delivery tag is treated as
  148. "up to and including", so that multiple messages
  149. can be acknowledged with a single method. If set
  150. to False, the delivery tag refers to a single
  151. message. If the multiple field is 1, and the
  152. delivery tag is zero, this indicates
  153. acknowledgement of all outstanding messages.
  154. """
  155. self._raise_if_not_open()
  156. return self._send_method(spec.Basic.Ack(delivery_tag, multiple))
  157. def basic_cancel(self, consumer_tag='', callback=None):
  158. """This method cancels a consumer. This does not affect already
  159. delivered messages, but it does mean the server will not send any more
  160. messages for that consumer. The client may receive an arbitrary number
  161. of messages in between sending the cancel method and receiving the
  162. cancel-ok reply. It may also be sent from the server to the client in
  163. the event of the consumer being unexpectedly cancelled (i.e. cancelled
  164. for any reason other than the server receiving the corresponding
  165. basic.cancel from the client). This allows clients to be notified of
  166. the loss of consumers due to events such as queue deletion.
  167. :param str consumer_tag: Identifier for the consumer
  168. :param callable callback: callback(pika.frame.Method) for method
  169. Basic.CancelOk. If None, do not expect a Basic.CancelOk response,
  170. otherwise, callback must be callable
  171. :raises ValueError:
  172. """
  173. validators.require_string(consumer_tag, 'consumer_tag')
  174. self._raise_if_not_open()
  175. nowait = validators.rpc_completion_callback(callback)
  176. if consumer_tag in self._cancelled:
  177. # We check for cancelled first, because basic_cancel removes
  178. # consumers closed with nowait from self._consumers
  179. LOGGER.warning('basic_cancel - consumer is already cancelling: %s',
  180. consumer_tag)
  181. return
  182. if consumer_tag not in self._consumers:
  183. # Could be cancelled by user or broker earlier
  184. LOGGER.warning('basic_cancel - consumer not found: %s',
  185. consumer_tag)
  186. return
  187. LOGGER.debug('Cancelling consumer: %s (nowait=%s)', consumer_tag,
  188. nowait)
  189. if nowait:
  190. # This is our last opportunity while the channel is open to remove
  191. # this consumer callback and help gc; unfortunately, this consumer's
  192. # self._cancelled and self._consumers_with_noack (if any) entries
  193. # will persist until the channel is closed.
  194. del self._consumers[consumer_tag]
  195. if callback is not None:
  196. self.callbacks.add(self.channel_number, spec.Basic.CancelOk,
  197. callback)
  198. self._cancelled.add(consumer_tag)
  199. self._rpc(spec.Basic.Cancel(consumer_tag=consumer_tag, nowait=nowait),
  200. self._on_cancelok if not nowait else None,
  201. [(spec.Basic.CancelOk, {
  202. 'consumer_tag': consumer_tag
  203. })] if not nowait else [])
  204. def basic_consume(self,
  205. queue,
  206. on_message_callback,
  207. auto_ack=False,
  208. exclusive=False,
  209. consumer_tag=None,
  210. arguments=None,
  211. callback=None):
  212. """Sends the AMQP 0-9-1 command Basic.Consume to the broker and binds messages
  213. for the consumer_tag to the consumer callback. If you do not pass in
  214. a consumer_tag, one will be automatically generated for you. Returns
  215. the consumer tag.
  216. For more information on basic_consume, see:
  217. Tutorial 2 at http://www.rabbitmq.com/getstarted.html
  218. http://www.rabbitmq.com/confirms.html
  219. http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.consume
  220. :param str queue: The queue to consume from. Use the empty string to
  221. specify the most recent server-named queue for this channel
  222. :param callable on_message_callback: The function to call when
  223. consuming with the signature
  224. on_message_callback(channel, method, properties, body), where
  225. channel: pika.Channel
  226. method: pika.spec.Basic.Deliver
  227. properties: pika.spec.BasicProperties
  228. body: bytes
  229. :param bool auto_ack: if set to True, automatic acknowledgement mode
  230. will be used (see http://www.rabbitmq.com/confirms.html).
  231. This corresponds with the 'no_ack' parameter in the basic.consume
  232. AMQP 0.9.1 method
  233. :param bool exclusive: Don't allow other consumers on the queue
  234. :param str consumer_tag: Specify your own consumer tag
  235. :param dict arguments: Custom key/value pair arguments for the consumer
  236. :param callable callback: callback(pika.frame.Method) for method
  237. Basic.ConsumeOk.
  238. :returns: Consumer tag which may be used to cancel the consumer.
  239. :rtype: str
  240. :raises ValueError:
  241. """
  242. validators.require_string(queue, 'queue')
  243. validators.require_callback(on_message_callback)
  244. self._raise_if_not_open()
  245. validators.rpc_completion_callback(callback)
  246. # If a consumer tag was not passed, create one
  247. if not consumer_tag:
  248. consumer_tag = self._generate_consumer_tag()
  249. if consumer_tag in self._consumers or consumer_tag in self._cancelled:
  250. raise exceptions.DuplicateConsumerTag(consumer_tag)
  251. if auto_ack:
  252. self._consumers_with_noack.add(consumer_tag)
  253. self._consumers[consumer_tag] = on_message_callback
  254. rpc_callback = self._on_eventok if callback is None else callback
  255. self._rpc(
  256. spec.Basic.Consume(queue=queue,
  257. consumer_tag=consumer_tag,
  258. no_ack=auto_ack,
  259. exclusive=exclusive,
  260. arguments=arguments or dict()), rpc_callback,
  261. [(spec.Basic.ConsumeOk, {
  262. 'consumer_tag': consumer_tag
  263. })])
  264. return consumer_tag
  265. def _generate_consumer_tag(self):
  266. """Generate a consumer tag
  267. NOTE: this protected method may be called by derived classes
  268. :returns: consumer tag
  269. :rtype: str
  270. """
  271. return 'ctag%i.%s' % (self.channel_number, uuid.uuid4().hex)
  272. def basic_get(self, queue, callback, auto_ack=False):
  273. """Get a single message from the AMQP broker. If you want to
  274. be notified of Basic.GetEmpty, use the Channel.add_callback method
  275. adding your Basic.GetEmpty callback which should expect only one
  276. parameter, frame. Due to implementation details, this cannot be called
  277. a second time until the callback is executed. For more information on
  278. basic_get and its parameters, see:
  279. http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.get
  280. :param str queue: The queue from which to get a message. Use the empty
  281. string to specify the most recent server-named queue for this
  282. channel
  283. :param callable callback: The callback to call with a message that has
  284. the signature callback(channel, method, properties, body), where:
  285. channel: pika.Channel
  286. method: pika.spec.Basic.GetOk
  287. properties: pika.spec.BasicProperties
  288. body: bytes
  289. :param bool auto_ack: Tell the broker to not expect a reply
  290. :raises ValueError:
  291. """
  292. validators.require_string(queue, 'queue')
  293. validators.require_callback(callback)
  294. if self._on_getok_callback is not None:
  295. raise exceptions.DuplicateGetOkCallback()
  296. self._on_getok_callback = callback
  297. # pylint: disable=W0511
  298. # TODO Strangely, not using _rpc for the synchronous Basic.Get. Would
  299. # need to extend _rpc to handle Basic.GetOk method, header, and body
  300. # frames (or similar)
  301. self._send_method(spec.Basic.Get(queue=queue, no_ack=auto_ack))
  302. def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
  303. """This method allows a client to reject one or more incoming messages.
  304. It can be used to interrupt and cancel large incoming messages, or
  305. return untreatable messages to their original queue.
  306. :param integer delivery-tag: int/long The server-assigned delivery tag
  307. :param bool multiple: If set to True, the delivery tag is treated as
  308. "up to and including", so that multiple messages
  309. can be acknowledged with a single method. If set
  310. to False, the delivery tag refers to a single
  311. message. If the multiple field is 1, and the
  312. delivery tag is zero, this indicates
  313. acknowledgement of all outstanding messages.
  314. :param bool requeue: If requeue is true, the server will attempt to
  315. requeue the message. If requeue is false or the
  316. requeue attempt fails the messages are discarded or
  317. dead-lettered.
  318. """
  319. self._raise_if_not_open()
  320. return self._send_method(
  321. spec.Basic.Nack(delivery_tag, multiple, requeue))
  322. def basic_publish(self,
  323. exchange,
  324. routing_key,
  325. body,
  326. properties=None,
  327. mandatory=False):
  328. """Publish to the channel with the given exchange, routing key and body.
  329. For more information on basic_publish and what the parameters do, see:
  330. http://www.rabbitmq.com/amqp-0-9-1-reference.html#basic.publish
  331. :param str exchange: The exchange to publish to
  332. :param str routing_key: The routing key to bind on
  333. :param bytes body: The message body
  334. :param pika.spec.BasicProperties properties: Basic.properties
  335. :param bool mandatory: The mandatory flag
  336. """
  337. self._raise_if_not_open()
  338. if isinstance(body, unicode_type):
  339. body = body.encode('utf-8')
  340. properties = properties or spec.BasicProperties()
  341. self._send_method(
  342. spec.Basic.Publish(exchange=exchange,
  343. routing_key=routing_key,
  344. mandatory=mandatory), (properties, body))
  345. def basic_qos(self,
  346. prefetch_size=0,
  347. prefetch_count=0,
  348. global_qos=False,
  349. callback=None):
  350. """Specify quality of service. This method requests a specific quality
  351. of service. The QoS can be specified for the current channel or for all
  352. channels on the connection. The client can request that messages be sent
  353. in advance so that when the client finishes processing a message, the
  354. following message is already held locally, rather than needing to be
  355. sent down the channel. Prefetching gives a performance improvement.
  356. :param int prefetch_size: This field specifies the prefetch window
  357. size. The server will send a message in
  358. advance if it is equal to or smaller in size
  359. than the available prefetch size (and also
  360. falls into other prefetch limits). May be set
  361. to zero, meaning "no specific limit",
  362. although other prefetch limits may still
  363. apply. The prefetch-size is ignored by
  364. consumers who have enabled the no-ack option.
  365. :param int prefetch_count: Specifies a prefetch window in terms of whole
  366. messages. This field may be used in
  367. combination with the prefetch-size field; a
  368. message will only be sent in advance if both
  369. prefetch windows (and those at the channel
  370. and connection level) allow it. The
  371. prefetch-count is ignored by consumers who
  372. have enabled the no-ack option.
  373. :param bool global_qos: Should the QoS apply to all consumers on the
  374. Channel
  375. :param callable callback: The callback to call for Basic.QosOk response
  376. :raises ValueError:
  377. """
  378. self._raise_if_not_open()
  379. validators.rpc_completion_callback(callback)
  380. validators.zero_or_greater('prefetch_size', prefetch_size)
  381. validators.zero_or_greater('prefetch_count', prefetch_count)
  382. return self._rpc(
  383. spec.Basic.Qos(prefetch_size, prefetch_count, global_qos), callback,
  384. [spec.Basic.QosOk])
  385. def basic_reject(self, delivery_tag, requeue=True):
  386. """Reject an incoming message. This method allows a client to reject a
  387. message. It can be used to interrupt and cancel large incoming messages,
  388. or return untreatable messages to their original queue.
  389. :param integer delivery-tag: int/long The server-assigned delivery tag
  390. :param bool requeue: If requeue is true, the server will attempt to
  391. requeue the message. If requeue is false or the
  392. requeue attempt fails the messages are discarded or
  393. dead-lettered.
  394. :raises: TypeError
  395. """
  396. self._raise_if_not_open()
  397. if not is_integer(delivery_tag):
  398. raise TypeError('delivery_tag must be an integer')
  399. return self._send_method(spec.Basic.Reject(delivery_tag, requeue))
  400. def basic_recover(self, requeue=False, callback=None):
  401. """This method asks the server to redeliver all unacknowledged messages
  402. on a specified channel. Zero or more messages may be redelivered. This
  403. method replaces the asynchronous Recover.
  404. :param bool requeue: If False, the message will be redelivered to the
  405. original recipient. If True, the server will
  406. attempt to requeue the message, potentially then
  407. delivering it to an alternative subscriber.
  408. :param callable callback: Callback to call when receiving
  409. Basic.RecoverOk
  410. :param callable callback: callback(pika.frame.Method) for method
  411. Basic.RecoverOk
  412. :raises ValueError:
  413. """
  414. self._raise_if_not_open()
  415. validators.rpc_completion_callback(callback)
  416. return self._rpc(spec.Basic.Recover(requeue), callback,
  417. [spec.Basic.RecoverOk])
  418. def close(self, reply_code=0, reply_text="Normal shutdown"):
  419. """Invoke a graceful shutdown of the channel with the AMQP Broker.
  420. If channel is OPENING, transition to CLOSING and suppress the incoming
  421. Channel.OpenOk, if any.
  422. :param int reply_code: The reason code to send to broker
  423. :param str reply_text: The reason text to send to broker
  424. :raises ChannelWrongStateError: if channel is closed or closing
  425. """
  426. if self.is_closed or self.is_closing:
  427. # Whoever is calling `close` might expect the on-channel-close-cb
  428. # to be called, which won't happen when it's already closed.
  429. self._raise_if_not_open()
  430. # If channel is OPENING, we will transition it to CLOSING state,
  431. # causing the _on_openok method to suppress the OPEN state transition
  432. # and the on-channel-open-callback
  433. LOGGER.info('Closing channel (%s): %r on %s', reply_code, reply_text,
  434. self)
  435. # Save the reason info so that we may use it in the '_on_channel_close'
  436. # callback processing
  437. self._closing_reason = exceptions.ChannelClosedByClient(
  438. reply_code, reply_text)
  439. for consumer_tag in dictkeys(self._consumers):
  440. if consumer_tag not in self._cancelled:
  441. self.basic_cancel(consumer_tag=consumer_tag)
  442. # Change state after cancelling consumers to avoid
  443. # ChannelWrongStateError exception from basic_cancel
  444. self._set_state(self.CLOSING)
  445. self._rpc(spec.Channel.Close(reply_code, reply_text, 0, 0),
  446. self._on_closeok, [spec.Channel.CloseOk])
  447. def confirm_delivery(self, ack_nack_callback, callback=None):
  448. """Turn on Confirm mode in the channel. Pass in a callback to be
  449. notified by the Broker when a message has been confirmed as received or
  450. rejected (Basic.Ack, Basic.Nack) from the broker to the publisher.
  451. For more information see:
  452. https://www.rabbitmq.com/confirms.html
  453. :param callable ack_nack_callback: Required callback for delivery
  454. confirmations that has the following signature:
  455. callback(pika.frame.Method), where method_frame contains
  456. either method `spec.Basic.Ack` or `spec.Basic.Nack`.
  457. :param callable callback: callback(pika.frame.Method) for method
  458. Confirm.SelectOk
  459. :raises ValueError:
  460. """
  461. if not callable(ack_nack_callback):
  462. # confirm_deliver requires a callback; it's meaningless
  463. # without a user callback to receieve Basic.Ack/Basic.Nack notifications
  464. raise ValueError('confirm_delivery requires a callback '
  465. 'to receieve Basic.Ack/Basic.Nack notifications')
  466. self._raise_if_not_open()
  467. nowait = validators.rpc_completion_callback(callback)
  468. if not (self.connection.publisher_confirms and
  469. self.connection.basic_nack):
  470. raise exceptions.MethodNotImplemented(
  471. 'Confirm.Select not Supported by Server')
  472. # Add the ack and nack callback
  473. self.callbacks.add(self.channel_number, spec.Basic.Ack,
  474. ack_nack_callback, False)
  475. self.callbacks.add(self.channel_number, spec.Basic.Nack,
  476. ack_nack_callback, False)
  477. self._rpc(spec.Confirm.Select(nowait), callback,
  478. [spec.Confirm.SelectOk] if not nowait else [])
  479. @property
  480. def consumer_tags(self):
  481. """Property method that returns a list of currently active consumers
  482. :rtype: list
  483. """
  484. return dictkeys(self._consumers)
  485. def exchange_bind(self,
  486. destination,
  487. source,
  488. routing_key='',
  489. arguments=None,
  490. callback=None):
  491. """Bind an exchange to another exchange.
  492. :param str destination: The destination exchange to bind
  493. :param str source: The source exchange to bind to
  494. :param str routing_key: The routing key to bind on
  495. :param dict arguments: Custom key/value pair arguments for the binding
  496. :param callable callback: callback(pika.frame.Method) for method Exchange.BindOk
  497. :raises ValueError:
  498. """
  499. self._raise_if_not_open()
  500. validators.require_string(destination, 'destination')
  501. validators.require_string(source, 'source')
  502. nowait = validators.rpc_completion_callback(callback)
  503. return self._rpc(
  504. spec.Exchange.Bind(0, destination, source, routing_key, nowait,
  505. arguments or dict()), callback,
  506. [spec.Exchange.BindOk] if not nowait else [])
  507. def exchange_declare(self,
  508. exchange,
  509. exchange_type='direct',
  510. passive=False,
  511. durable=False,
  512. auto_delete=False,
  513. internal=False,
  514. arguments=None,
  515. callback=None):
  516. """This method creates an exchange if it does not already exist, and if
  517. the exchange exists, verifies that it is of the correct and expected
  518. class.
  519. If passive set, the server will reply with Declare-Ok if the exchange
  520. already exists with the same name, and raise an error if not and if the
  521. exchange does not already exist, the server MUST raise a channel
  522. exception with reply code 404 (not found).
  523. :param str exchange: The exchange name consists of a non-empty sequence
  524. of these characters: letters, digits, hyphen, underscore, period,
  525. or colon
  526. :param str exchange_type: The exchange type to use
  527. :param bool passive: Perform a declare or just check to see if it exists
  528. :param bool durable: Survive a reboot of RabbitMQ
  529. :param bool auto_delete: Remove when no more queues are bound to it
  530. :param bool internal: Can only be published to by other exchanges
  531. :param dict arguments: Custom key/value pair arguments for the exchange
  532. :param callable callback: callback(pika.frame.Method) for method Exchange.DeclareOk
  533. :raises ValueError:
  534. """
  535. validators.require_string(exchange, 'exchange')
  536. self._raise_if_not_open()
  537. nowait = validators.rpc_completion_callback(callback)
  538. return self._rpc(
  539. spec.Exchange.Declare(0, exchange, exchange_type, passive, durable,
  540. auto_delete, internal, nowait, arguments or
  541. dict()), callback,
  542. [spec.Exchange.DeclareOk] if not nowait else [])
  543. def exchange_delete(self, exchange=None, if_unused=False, callback=None):
  544. """Delete the exchange.
  545. :param str exchange: The exchange name
  546. :param bool if_unused: only delete if the exchange is unused
  547. :param callable callback: callback(pika.frame.Method) for method Exchange.DeleteOk
  548. :raises ValueError:
  549. """
  550. self._raise_if_not_open()
  551. nowait = validators.rpc_completion_callback(callback)
  552. return self._rpc(spec.Exchange.Delete(0, exchange, if_unused,
  553. nowait), callback,
  554. [spec.Exchange.DeleteOk] if not nowait else [])
  555. def exchange_unbind(self,
  556. destination=None,
  557. source=None,
  558. routing_key='',
  559. arguments=None,
  560. callback=None):
  561. """Unbind an exchange from another exchange.
  562. :param str destination: The destination exchange to unbind
  563. :param str source: The source exchange to unbind from
  564. :param str routing_key: The routing key to unbind
  565. :param dict arguments: Custom key/value pair arguments for the binding
  566. :param callable callback: callback(pika.frame.Method) for method Exchange.UnbindOk
  567. :raises ValueError:
  568. """
  569. self._raise_if_not_open()
  570. nowait = validators.rpc_completion_callback(callback)
  571. return self._rpc(
  572. spec.Exchange.Unbind(0, destination, source, routing_key, nowait,
  573. arguments), callback,
  574. [spec.Exchange.UnbindOk] if not nowait else [])
  575. def flow(self, active, callback=None):
  576. """Turn Channel flow control off and on. Pass a callback to be notified
  577. of the response from the server. active is a bool. Callback should
  578. expect a bool in response indicating channel flow state. For more
  579. information, please reference:
  580. http://www.rabbitmq.com/amqp-0-9-1-reference.html#channel.flow
  581. :param bool active: Turn flow on or off
  582. :param callable callback: callback(bool) upon completion
  583. :raises ValueError:
  584. """
  585. self._raise_if_not_open()
  586. validators.rpc_completion_callback(callback)
  587. self._on_flowok_callback = callback
  588. self._rpc(spec.Channel.Flow(active), self._on_flowok,
  589. [spec.Channel.FlowOk])
  590. @property
  591. def is_closed(self):
  592. """Returns True if the channel is closed.
  593. :rtype: bool
  594. """
  595. return self._state == self.CLOSED
  596. @property
  597. def is_closing(self):
  598. """Returns True if client-initiated closing of the channel is in
  599. progress.
  600. :rtype: bool
  601. """
  602. return self._state == self.CLOSING
  603. @property
  604. def is_open(self):
  605. """Returns True if the channel is open.
  606. :rtype: bool
  607. """
  608. return self._state == self.OPEN
  609. def open(self):
  610. """Open the channel"""
  611. self._set_state(self.OPENING)
  612. self._add_callbacks()
  613. self._rpc(spec.Channel.Open(), self._on_openok, [spec.Channel.OpenOk])
  614. def queue_bind(self,
  615. queue,
  616. exchange,
  617. routing_key=None,
  618. arguments=None,
  619. callback=None):
  620. """Bind the queue to the specified exchange
  621. :param str queue: The queue to bind to the exchange
  622. :param str exchange: The source exchange to bind to
  623. :param str routing_key: The routing key to bind on
  624. :param dict arguments: Custom key/value pair arguments for the binding
  625. :param callable callback: callback(pika.frame.Method) for method Queue.BindOk
  626. :raises ValueError:
  627. """
  628. validators.require_string(queue, 'queue')
  629. validators.require_string(exchange, 'exchange')
  630. self._raise_if_not_open()
  631. nowait = validators.rpc_completion_callback(callback)
  632. if routing_key is None:
  633. routing_key = queue
  634. return self._rpc(
  635. spec.Queue.Bind(0, queue, exchange, routing_key, nowait,
  636. arguments or dict()), callback,
  637. [spec.Queue.BindOk] if not nowait else [])
  638. def queue_declare(self,
  639. queue,
  640. passive=False,
  641. durable=False,
  642. exclusive=False,
  643. auto_delete=False,
  644. arguments=None,
  645. callback=None):
  646. """Declare queue, create if needed. This method creates or checks a
  647. queue. When creating a new queue the client can specify various
  648. properties that control the durability of the queue and its contents,
  649. and the level of sharing for the queue.
  650. Use an empty string as the queue name for the broker to auto-generate
  651. one
  652. :param str queue: The queue name; if empty string, the broker will
  653. create a unique queue name
  654. :param bool passive: Only check to see if the queue exists
  655. :param bool durable: Survive reboots of the broker
  656. :param bool exclusive: Only allow access by the current connection
  657. :param bool auto_delete: Delete after consumer cancels or disconnects
  658. :param dict arguments: Custom key/value arguments for the queue
  659. :param callable callback: callback(pika.frame.Method) for method Queue.DeclareOk
  660. :raises ValueError:
  661. """
  662. validators.require_string(queue, 'queue')
  663. self._raise_if_not_open()
  664. nowait = validators.rpc_completion_callback(callback)
  665. if queue:
  666. condition = (spec.Queue.DeclareOk, {'queue': queue})
  667. else:
  668. condition = spec.Queue.DeclareOk
  669. replies = [condition] if not nowait else []
  670. return self._rpc(
  671. spec.Queue.Declare(0, queue, passive, durable, exclusive,
  672. auto_delete, nowait, arguments or dict()),
  673. callback, replies)
  674. def queue_delete(self,
  675. queue,
  676. if_unused=False,
  677. if_empty=False,
  678. callback=None):
  679. """Delete a queue from the broker.
  680. :param str queue: The queue to delete
  681. :param bool if_unused: only delete if it's unused
  682. :param bool if_empty: only delete if the queue is empty
  683. :param callable callback: callback(pika.frame.Method) for method Queue.DeleteOk
  684. :raises ValueError:
  685. """
  686. self._raise_if_not_open()
  687. validators.require_string(queue, 'queue')
  688. nowait = validators.rpc_completion_callback(callback)
  689. replies = [spec.Queue.DeleteOk] if not nowait else []
  690. return self._rpc(
  691. spec.Queue.Delete(0, queue, if_unused, if_empty, nowait), callback,
  692. replies)
  693. def queue_purge(self, queue, callback=None):
  694. """Purge all of the messages from the specified queue
  695. :param str queue: The queue to purge
  696. :param callable callback: callback(pika.frame.Method) for method Queue.PurgeOk
  697. :raises ValueError:
  698. """
  699. self._raise_if_not_open()
  700. validators.require_string(queue, 'queue')
  701. nowait = validators.rpc_completion_callback(callback)
  702. replies = [spec.Queue.PurgeOk] if not nowait else []
  703. return self._rpc(spec.Queue.Purge(0, queue, nowait), callback, replies)
  704. def queue_unbind(self,
  705. queue,
  706. exchange=None,
  707. routing_key=None,
  708. arguments=None,
  709. callback=None):
  710. """Unbind a queue from an exchange.
  711. :param str queue: The queue to unbind from the exchange
  712. :param str exchange: The source exchange to bind from
  713. :param str routing_key: The routing key to unbind
  714. :param dict arguments: Custom key/value pair arguments for the binding
  715. :param callable callback: callback(pika.frame.Method) for method Queue.UnbindOk
  716. :raises ValueError:
  717. """
  718. self._raise_if_not_open()
  719. validators.require_string(queue, 'queue')
  720. validators.rpc_completion_callback(callback)
  721. if routing_key is None:
  722. routing_key = queue
  723. return self._rpc(
  724. spec.Queue.Unbind(0, queue, exchange, routing_key, arguments or
  725. dict()), callback, [spec.Queue.UnbindOk])
  726. def tx_commit(self, callback=None):
  727. """Commit a transaction
  728. :param callable callback: The callback for delivery confirmations
  729. :raises ValueError:
  730. """
  731. self._raise_if_not_open()
  732. validators.rpc_completion_callback(callback)
  733. return self._rpc(spec.Tx.Commit(), callback, [spec.Tx.CommitOk])
  734. def tx_rollback(self, callback=None):
  735. """Rollback a transaction.
  736. :param callable callback: The callback for delivery confirmations
  737. :raises ValueError:
  738. """
  739. self._raise_if_not_open()
  740. validators.rpc_completion_callback(callback)
  741. return self._rpc(spec.Tx.Rollback(), callback, [spec.Tx.RollbackOk])
  742. def tx_select(self, callback=None):
  743. """Select standard transaction mode. This method sets the channel to use
  744. standard transactions. The client must use this method at least once on
  745. a channel before using the Commit or Rollback methods.
  746. :param callable callback: The callback for delivery confirmations
  747. :raises ValueError:
  748. """
  749. self._raise_if_not_open()
  750. validators.rpc_completion_callback(callback)
  751. return self._rpc(spec.Tx.Select(), callback, [spec.Tx.SelectOk])
  752. # Internal methods
  753. def _add_callbacks(self):
  754. """Callbacks that add the required behavior for a channel when
  755. connecting and connected to a server.
  756. """
  757. # Add a callback for Basic.GetEmpty
  758. self.callbacks.add(self.channel_number, spec.Basic.GetEmpty,
  759. self._on_getempty, False)
  760. # Add a callback for Basic.Cancel
  761. self.callbacks.add(self.channel_number, spec.Basic.Cancel,
  762. self._on_cancel, False)
  763. # Deprecated in newer versions of RabbitMQ but still register for it
  764. self.callbacks.add(self.channel_number, spec.Channel.Flow,
  765. self._on_flow, False)
  766. # Add a callback for when the server closes our channel
  767. self.callbacks.add(self.channel_number, spec.Channel.Close,
  768. self._on_close_from_broker, True)
  769. def _add_on_cleanup_callback(self, callback):
  770. """For internal use only (e.g., Connection needs to remove closed
  771. channels from its channel container). Pass a callback function that will
  772. be called when the channel is being cleaned up after all channel-close
  773. callbacks callbacks.
  774. :param callable callback: The callback to call, having the
  775. signature: callback(channel)
  776. """
  777. self.callbacks.add(self.channel_number,
  778. self._ON_CHANNEL_CLEANUP_CB_KEY,
  779. callback,
  780. one_shot=True,
  781. only_caller=self)
  782. def _cleanup(self):
  783. """Remove all consumers and any callbacks for the channel."""
  784. self.callbacks.process(self.channel_number,
  785. self._ON_CHANNEL_CLEANUP_CB_KEY, self, self)
  786. self._consumers = dict()
  787. self.callbacks.cleanup(str(self.channel_number))
  788. self._cookie = None
  789. def _cleanup_consumer_ref(self, consumer_tag):
  790. """Remove any references to the consumer tag in internal structures
  791. for consumer state.
  792. :param str consumer_tag: The consumer tag to cleanup
  793. """
  794. self._consumers_with_noack.discard(consumer_tag)
  795. self._consumers.pop(consumer_tag, None)
  796. self._cancelled.discard(consumer_tag)
  797. def _get_cookie(self):
  798. """Used by the wrapper implementation (e.g., `BlockingChannel`) to
  799. retrieve the cookie that it set via `_set_cookie`
  800. :returns: opaque cookie value that was set via `_set_cookie`
  801. :rtype: object
  802. """
  803. return self._cookie
  804. def _handle_content_frame(self, frame_value):
  805. """This is invoked by the connection when frames that are not registered
  806. with the CallbackManager have been found. This should only be the case
  807. when the frames are related to content delivery.
  808. The _content_assembler will be invoked which will return the fully
  809. formed message in three parts when all of the body frames have been
  810. received.
  811. :param pika.amqp_object.Frame frame_value: The frame to deliver
  812. """
  813. try:
  814. response = self._content_assembler.process(frame_value)
  815. except exceptions.UnexpectedFrameError:
  816. self._on_unexpected_frame(frame_value)
  817. return
  818. if response:
  819. if isinstance(response[0].method, spec.Basic.Deliver):
  820. self._on_deliver(*response)
  821. elif isinstance(response[0].method, spec.Basic.GetOk):
  822. self._on_getok(*response)
  823. elif isinstance(response[0].method, spec.Basic.Return):
  824. self._on_return(*response)
  825. def _on_cancel(self, method_frame):
  826. """When the broker cancels a consumer, delete it from our internal
  827. dictionary.
  828. :param pika.frame.Method method_frame: The method frame received
  829. """
  830. if method_frame.method.consumer_tag in self._cancelled:
  831. # User-initiated cancel is waiting for Cancel-ok
  832. return
  833. self._cleanup_consumer_ref(method_frame.method.consumer_tag)
  834. def _on_cancelok(self, method_frame):
  835. """Called in response to a frame from the Broker when the
  836. client sends Basic.Cancel
  837. :param pika.frame.Method method_frame: The method frame received
  838. """
  839. self._cleanup_consumer_ref(method_frame.method.consumer_tag)
  840. def _transition_to_closed(self):
  841. """Common logic for transitioning the channel to the CLOSED state:
  842. Set state to CLOSED, dispatch callbacks registered via
  843. `Channel.add_on_close_callback()`, and mop up.
  844. Assumes that the channel is not in CLOSED state and that
  845. `self._closing_reason` has been set up
  846. """
  847. assert not self.is_closed
  848. assert self._closing_reason is not None
  849. self._set_state(self.CLOSED)
  850. try:
  851. self.callbacks.process(self.channel_number, '_on_channel_close',
  852. self, self, self._closing_reason)
  853. finally:
  854. self._cleanup()
  855. def _on_close_from_broker(self, method_frame):
  856. """Handle `Channel.Close` from broker.
  857. :param pika.frame.Method method_frame: Method frame with Channel.Close
  858. method
  859. """
  860. LOGGER.warning('Received remote Channel.Close (%s): %r on %s',
  861. method_frame.method.reply_code,
  862. method_frame.method.reply_text, self)
  863. # Note, we should not be called when channel is already closed
  864. assert not self.is_closed
  865. # AMQP 0.9.1 requires CloseOk response to Channel.Close;
  866. self._send_method(spec.Channel.CloseOk())
  867. # Save the details, possibly overriding user-provided values if
  868. # user-initiated close is pending (in which case they will be provided
  869. # to user callback when CloseOk arrives).
  870. self._closing_reason = exceptions.ChannelClosedByBroker(
  871. method_frame.method.reply_code, method_frame.method.reply_text)
  872. if self.is_closing:
  873. # Since we may have already put Channel.Close on the wire, we need
  874. # to wait for CloseOk before cleaning up to avoid a race condition
  875. # whereby our channel number might get reused before our CloseOk
  876. # arrives
  877. #
  878. # NOTE: if our Channel.Close destined for the broker was blocked by
  879. # an earlier synchronous method, this call will drop it and perform
  880. # a meta-close (see `_on_close_meta()` which fakes receipt of
  881. # `Channel.CloseOk` and dispatches the `'_on_channel_close'`
  882. # callbacks.
  883. self._drain_blocked_methods_on_remote_close()
  884. else:
  885. self._transition_to_closed()
  886. def _on_close_meta(self, reason):
  887. """Handle meta-close request from either a remote Channel.Close from
  888. the broker (when a pending Channel.Close method is queued for
  889. execution) or a Connection's cleanup logic after sudden connection
  890. loss. We use this opportunity to transition to CLOSED state, clean up
  891. the channel, and dispatch the on-channel-closed callbacks.
  892. :param Exception reason: Exception describing the reason for closing.
  893. """
  894. LOGGER.debug('Handling meta-close on %s: %r', self, reason)
  895. if not self.is_closed:
  896. self._closing_reason = reason
  897. self._transition_to_closed()
  898. def _on_closeok(self, method_frame):
  899. """Invoked when RabbitMQ replies to a Channel.Close method
  900. :param pika.frame.Method method_frame: Method frame with Channel.CloseOk
  901. method
  902. """
  903. LOGGER.info('Received %s on %s', method_frame.method, self)
  904. self._transition_to_closed()
  905. def _on_deliver(self, method_frame, header_frame, body):
  906. """Cope with reentrancy. If a particular consumer is still active when
  907. another delivery appears for it, queue the deliveries up until it
  908. finally exits.
  909. :param pika.frame.Method method_frame: The method frame received
  910. :param pika.frame.Header header_frame: The header frame received
  911. :param bytes body: The body received
  912. """
  913. consumer_tag = method_frame.method.consumer_tag
  914. if consumer_tag in self._cancelled:
  915. if self.is_open and consumer_tag not in self._consumers_with_noack:
  916. self.basic_reject(method_frame.method.delivery_tag)
  917. return
  918. if consumer_tag not in self._consumers:
  919. LOGGER.error('Unexpected delivery: %r', method_frame)
  920. return
  921. self._consumers[consumer_tag](self, method_frame.method,
  922. header_frame.properties, body)
  923. def _on_eventok(self, method_frame):
  924. """Generic events that returned ok that may have internal callbacks.
  925. We keep a list of what we've yet to implement so that we don't silently
  926. drain events that we don't support.
  927. :param pika.frame.Method method_frame: The method frame received
  928. """
  929. LOGGER.debug('Discarding frame %r', method_frame)
  930. def _on_flow(self, _method_frame_unused):
  931. """Called if the server sends a Channel.Flow frame.
  932. :param pika.frame.Method method_frame_unused: The Channel.Flow frame
  933. """
  934. if self._has_on_flow_callback is False:
  935. LOGGER.warning('Channel.Flow received from server')
  936. def _on_flowok(self, method_frame):
  937. """Called in response to us asking the server to toggle on Channel.Flow
  938. :param pika.frame.Method method_frame: The method frame received
  939. """
  940. self.flow_active = method_frame.method.active
  941. if self._on_flowok_callback:
  942. self._on_flowok_callback(method_frame.method.active)
  943. self._on_flowok_callback = None
  944. else:
  945. LOGGER.warning('Channel.FlowOk received with no active callbacks')
  946. def _on_getempty(self, method_frame):
  947. """When we receive an empty reply do nothing but log it
  948. :param pika.frame.Method method_frame: The method frame received
  949. """
  950. LOGGER.debug('Received Basic.GetEmpty: %r', method_frame)
  951. if self._on_getok_callback is not None:
  952. self._on_getok_callback = None
  953. def _on_getok(self, method_frame, header_frame, body):
  954. """Called in reply to a Basic.Get when there is a message.
  955. :param pika.frame.Method method_frame: The method frame received
  956. :param pika.frame.Header header_frame: The header frame received
  957. :param bytes body: The body received
  958. """
  959. if self._on_getok_callback is not None:
  960. callback = self._on_getok_callback
  961. self._on_getok_callback = None
  962. callback(self, method_frame.method, header_frame.properties, body)
  963. else:
  964. LOGGER.error('Basic.GetOk received with no active callback')
  965. def _on_openok(self, method_frame):
  966. """Called by our callback handler when we receive a Channel.OpenOk and
  967. subsequently calls our _on_openok_callback which was passed into the
  968. Channel constructor. The reason we do this is because we want to make
  969. sure that the on_open_callback parameter passed into the Channel
  970. constructor is not the first callback we make.
  971. Suppress the state transition and callback if channel is already in
  972. CLOSING state.
  973. :param pika.frame.Method method_frame: Channel.OpenOk frame
  974. """
  975. # Suppress OpenOk if the user or Connection.Close started closing it
  976. # before open completed.
  977. if self.is_closing:
  978. LOGGER.debug('Suppressing while in closing state: %s', method_frame)
  979. else:
  980. self._set_state(self.OPEN)
  981. if self._on_openok_callback is not None:
  982. self._on_openok_callback(self)
  983. def _on_return(self, method_frame, header_frame, body):
  984. """Called if the server sends a Basic.Return frame.
  985. :param pika.frame.Method method_frame: The Basic.Return frame
  986. :param pika.frame.Header header_frame: The content header frame
  987. :param bytes body: The message body
  988. """
  989. if not self.callbacks.process(self.channel_number, '_on_return', self,
  990. self, method_frame.method,
  991. header_frame.properties, body):
  992. LOGGER.warning('Basic.Return received from server (%r, %r)',
  993. method_frame.method, header_frame.properties)
  994. def _on_selectok(self, method_frame):
  995. """Called when the broker sends a Confirm.SelectOk frame
  996. :param pika.frame.Method method_frame: The method frame received
  997. """
  998. LOGGER.debug("Confirm.SelectOk Received: %r", method_frame)
  999. def _on_synchronous_complete(self, _method_frame_unused):
  1000. """This is called when a synchronous command is completed. It will undo
  1001. the blocking state and send all the frames that stacked up while we
  1002. were in the blocking state.
  1003. :param pika.frame.Method method_frame_unused: The method frame received
  1004. """
  1005. LOGGER.debug('%i blocked frames', len(self._blocked))
  1006. self._blocking = None
  1007. # self._blocking must be checked here as a callback could
  1008. # potentially change the state of that variable during an
  1009. # iteration of the while loop
  1010. while self._blocked and self._blocking is None:
  1011. self._rpc(*self._blocked.popleft())
  1012. def _drain_blocked_methods_on_remote_close(self):
  1013. """This is called when the broker sends a Channel.Close while the
  1014. client is in CLOSING state. This method checks the blocked method
  1015. queue for a pending client-initiated Channel.Close method and
  1016. ensures its callbacks are processed, but does not send the method
  1017. to the broker.
  1018. The broker may close the channel before responding to outstanding
  1019. in-transit synchronous methods, or even before these methods have been
  1020. sent to the broker. AMQP 0.9.1 obliges the server to drop all methods
  1021. arriving on a closed channel other than Channel.CloseOk and
  1022. Channel.Close. Since the response to a synchronous method that blocked
  1023. the channel never arrives, the channel never becomes unblocked, and the
  1024. Channel.Close, if any, in the blocked queue has no opportunity to be
  1025. sent, and thus its completion callback would never be called.
  1026. """
  1027. LOGGER.debug(
  1028. 'Draining %i blocked frames due to broker-requested Channel.Close',
  1029. len(self._blocked))
  1030. while self._blocked:
  1031. method = self._blocked.popleft()[0]
  1032. if isinstance(method, spec.Channel.Close):
  1033. # The desired reason is already in self._closing_reason
  1034. self._on_close_meta(self._closing_reason)
  1035. else:
  1036. LOGGER.debug('Ignoring drained blocked method: %s', method)
  1037. def _rpc(self, method, callback=None, acceptable_replies=None):
  1038. """Make a syncronous channel RPC call for a synchronous method frame. If
  1039. the channel is already in the blocking state, then enqueue the request,
  1040. but don't send it at this time; it will be eventually sent by
  1041. `_on_synchronous_complete` after the prior blocking request receives a
  1042. resposne. If the channel is not in the blocking state and
  1043. `acceptable_replies` is not empty, transition the channel to the
  1044. blocking state and register for `_on_synchronous_complete` before
  1045. sending the request.
  1046. NOTE: A callback must be accompanied by non-empty acceptable_replies.
  1047. :param pika.amqp_object.Method method: The AMQP method to invoke
  1048. :param callable callback: The callback for the RPC response
  1049. :param list|None acceptable_replies: A (possibly empty) sequence of
  1050. replies this RPC call expects or None
  1051. """
  1052. assert method.synchronous, (
  1053. 'Only synchronous-capable methods may be used with _rpc: %r' %
  1054. (method,))
  1055. # Validate we got None or a list of acceptable_replies
  1056. if not isinstance(acceptable_replies, (type(None), list)):
  1057. raise TypeError('acceptable_replies should be list or None')
  1058. if callback is not None:
  1059. # Validate the callback is callable
  1060. if not callable(callback):
  1061. raise TypeError('callback should be None or a callable')
  1062. # Make sure that callback is accompanied by acceptable replies
  1063. if not acceptable_replies:
  1064. raise ValueError(
  1065. 'Unexpected callback for asynchronous (nowait) operation.')
  1066. # Make sure the channel is not closed yet
  1067. if self.is_closed:
  1068. self._raise_if_not_open()
  1069. # If the channel is blocking, add subsequent commands to our stack
  1070. if self._blocking:
  1071. LOGGER.debug(
  1072. 'Already in blocking state, so enqueueing method %s; '
  1073. 'acceptable_replies=%r', method, acceptable_replies)
  1074. self._blocked.append([method, callback, acceptable_replies])
  1075. return
  1076. # Note: _send_method can throw exceptions if there are framing errors
  1077. # or invalid data passed in. Call it here to prevent self._blocking
  1078. # from being set if an exception is thrown. This also prevents
  1079. # acceptable_replies registering callbacks when exceptions are thrown
  1080. self._send_method(method)
  1081. # If acceptable replies are set, add callbacks
  1082. if acceptable_replies:
  1083. # Block until a response frame is received for synchronous frames
  1084. self._blocking = method.NAME
  1085. LOGGER.debug(
  1086. 'Entering blocking state on frame %s; acceptable_replies=%r',
  1087. method, acceptable_replies)
  1088. for reply in acceptable_replies:
  1089. if isinstance(reply, tuple):
  1090. reply, arguments = reply
  1091. else:
  1092. arguments = None
  1093. LOGGER.debug('Adding on_synchronous_complete callback')
  1094. self.callbacks.add(self.channel_number,
  1095. reply,
  1096. self._on_synchronous_complete,
  1097. arguments=arguments)
  1098. if callback is not None:
  1099. LOGGER.debug('Adding passed-in RPC response callback')
  1100. self.callbacks.add(self.channel_number,
  1101. reply,
  1102. callback,
  1103. arguments=arguments)
  1104. def _raise_if_not_open(self):
  1105. """If channel is not in the OPEN state, raises ChannelWrongStateError
  1106. with `reply_code` and `reply_text` corresponding to current state.
  1107. :raises exceptions.ChannelWrongStateError: if channel is not in OPEN
  1108. state.
  1109. """
  1110. if self._state == self.OPEN:
  1111. return
  1112. if self._state == self.OPENING:
  1113. raise exceptions.ChannelWrongStateError(
  1114. 'Channel is opening, but is not usable yet.')
  1115. elif self._state == self.CLOSING:
  1116. raise exceptions.ChannelWrongStateError('Channel is closing.')
  1117. else: # Assumed self.CLOSED
  1118. assert self._state == self.CLOSED
  1119. raise exceptions.ChannelWrongStateError('Channel is closed.')
  1120. def _send_method(self, method, content=None):
  1121. """Shortcut wrapper to send a method through our connection, passing in
  1122. the channel number
  1123. :param pika.amqp_object.Method method: The method to send
  1124. :param tuple content: If set, is a content frame, is tuple of
  1125. properties and body.
  1126. """
  1127. # pylint: disable=W0212
  1128. self.connection._send_method(self.channel_number, method, content)
  1129. def _set_cookie(self, cookie):
  1130. """Used by wrapper layer (e.g., `BlockingConnection`) to link the
  1131. channel implementation back to the proxy. See `_get_cookie`.
  1132. :param cookie: an opaque value; typically a proxy channel implementation
  1133. instance (e.g., `BlockingChannel` instance)
  1134. """
  1135. self._cookie = cookie
  1136. def _set_state(self, connection_state):
  1137. """Set the channel connection state to the specified state value.
  1138. :param int connection_state: The connection_state value
  1139. """
  1140. self._state = connection_state
  1141. def _on_unexpected_frame(self, frame_value):
  1142. """Invoked when a frame is received that is not setup to be processed.
  1143. :param pika.frame.Frame frame_value: The frame received
  1144. """
  1145. LOGGER.error('Unexpected frame: %r', frame_value)
  1146. class ContentFrameAssembler(object):
  1147. """Handle content related frames, building a message and return the message
  1148. back in three parts upon receipt.
  1149. """
  1150. def __init__(self):
  1151. """Create a new instance of the conent frame assembler.
  1152. """
  1153. self._method_frame = None
  1154. self._header_frame = None
  1155. self._seen_so_far = 0
  1156. self._body_fragments = list()
  1157. def process(self, frame_value):
  1158. """Invoked by the Channel object when passed frames that are not
  1159. setup in the rpc process and that don't have explicit reply types
  1160. defined. This includes Basic.Publish, Basic.GetOk and Basic.Return
  1161. :param Method|Header|Body frame_value: The frame to process
  1162. """
  1163. if (isinstance(frame_value, frame.Method) and
  1164. spec.has_content(frame_value.method.INDEX)):
  1165. self._method_frame = frame_value
  1166. return None
  1167. elif isinstance(frame_value, frame.Header):
  1168. self._header_frame = frame_value
  1169. if frame_value.body_size == 0:
  1170. return self._finish()
  1171. else:
  1172. return None
  1173. elif isinstance(frame_value, frame.Body):
  1174. return self._handle_body_frame(frame_value)
  1175. else:
  1176. raise exceptions.UnexpectedFrameError(frame_value)
  1177. def _finish(self):
  1178. """Invoked when all of the message has been received
  1179. :rtype: tuple(pika.frame.Method, pika.frame.Header, str)
  1180. """
  1181. content = (self._method_frame, self._header_frame,
  1182. b''.join(self._body_fragments))
  1183. self._reset()
  1184. return content
  1185. def _handle_body_frame(self, body_frame):
  1186. """Receive body frames and append them to the stack. When the body size
  1187. matches, call the finish method.
  1188. :param Body body_frame: The body frame
  1189. :raises: pika.exceptions.BodyTooLongError
  1190. :rtype: tuple(pika.frame.Method, pika.frame.Header, str)|None
  1191. """
  1192. self._seen_so_far += len(body_frame.fragment)
  1193. self._body_fragments.append(body_frame.fragment)
  1194. if self._seen_so_far == self._header_frame.body_size:
  1195. return self._finish()
  1196. elif self._seen_so_far > self._header_frame.body_size:
  1197. raise exceptions.BodyTooLongError(self._seen_so_far,
  1198. self._header_frame.body_size)
  1199. return None
  1200. def _reset(self):
  1201. """Reset the values for processing frames"""
  1202. self._method_frame = None
  1203. self._header_frame = None
  1204. self._seen_so_far = 0
  1205. self._body_fragments = list()