amqp.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.app.amqp
  4. ~~~~~~~~~~~~~~~
  5. Sending and receiving messages using Kombu.
  6. """
  7. from __future__ import absolute_import
  8. from datetime import timedelta
  9. from weakref import WeakValueDictionary
  10. from kombu import Connection, Consumer, Exchange, Producer, Queue
  11. from kombu.common import Broadcast
  12. from kombu.pools import ProducerPool
  13. from kombu.utils import cached_property, uuid
  14. from kombu.utils.encoding import safe_repr
  15. from kombu.utils.functional import maybe_list
  16. from celery import signals
  17. from celery.five import items, string_t
  18. from celery.utils.text import indent as textindent
  19. from . import app_or_default
  20. from . import routes as _routes
  21. __all__ = ['AMQP', 'Queues', 'TaskProducer', 'TaskConsumer']
  22. #: Human readable queue declaration.
  23. QUEUE_FORMAT = """
  24. .> {0.name:<16} exchange={0.exchange.name}({0.exchange.type}) \
  25. key={0.routing_key}
  26. """
  27. class Queues(dict):
  28. """Queue name⇒ declaration mapping.
  29. :param queues: Initial list/tuple or dict of queues.
  30. :keyword create_missing: By default any unknown queues will be
  31. added automatically, but if disabled
  32. the occurrence of unknown queues
  33. in `wanted` will raise :exc:`KeyError`.
  34. :keyword ha_policy: Default HA policy for queues with none set.
  35. """
  36. #: If set, this is a subset of queues to consume from.
  37. #: The rest of the queues are then used for routing only.
  38. _consume_from = None
  39. def __init__(self, queues=None, default_exchange=None,
  40. create_missing=True, ha_policy=None, autoexchange=None):
  41. dict.__init__(self)
  42. self.aliases = WeakValueDictionary()
  43. self.default_exchange = default_exchange
  44. self.create_missing = create_missing
  45. self.ha_policy = ha_policy
  46. self.autoexchange = Exchange if autoexchange is None else autoexchange
  47. if isinstance(queues, (tuple, list)):
  48. queues = dict((q.name, q) for q in queues)
  49. for name, q in items(queues or {}):
  50. self.add(q) if isinstance(q, Queue) else self.add_compat(name, **q)
  51. def __getitem__(self, name):
  52. try:
  53. return self.aliases[name]
  54. except KeyError:
  55. return dict.__getitem__(self, name)
  56. def __setitem__(self, name, queue):
  57. if self.default_exchange and (not queue.exchange or
  58. not queue.exchange.name):
  59. queue.exchange = self.default_exchange
  60. dict.__setitem__(self, name, queue)
  61. if queue.alias:
  62. self.aliases[queue.alias] = queue
  63. def __missing__(self, name):
  64. if self.create_missing:
  65. return self.add(self.new_missing(name))
  66. raise KeyError(name)
  67. def add(self, queue, **kwargs):
  68. """Add new queue.
  69. The first argument can either be a :class:`kombu.Queue` instance,
  70. or the name of a queue. If the former the rest of the keyword
  71. arguments are ignored, and options are simply taken from the queue
  72. instance.
  73. :param queue: :class:`kombu.Queue` instance or name of the queue.
  74. :keyword exchange: (if named) specifies exchange name.
  75. :keyword routing_key: (if named) specifies binding key.
  76. :keyword exchange_type: (if named) specifies type of exchange.
  77. :keyword \*\*options: (if named) Additional declaration options.
  78. """
  79. if not isinstance(queue, Queue):
  80. return self.add_compat(queue, **kwargs)
  81. if self.ha_policy:
  82. if queue.queue_arguments is None:
  83. queue.queue_arguments = {}
  84. self._set_ha_policy(queue.queue_arguments)
  85. self[queue.name] = queue
  86. return queue
  87. def add_compat(self, name, **options):
  88. # docs used to use binding_key as routing key
  89. options.setdefault('routing_key', options.get('binding_key'))
  90. if options['routing_key'] is None:
  91. options['routing_key'] = name
  92. if self.ha_policy is not None:
  93. self._set_ha_policy(options.setdefault('queue_arguments', {}))
  94. q = self[name] = Queue.from_dict(name, **options)
  95. return q
  96. def _set_ha_policy(self, args):
  97. policy = self.ha_policy
  98. if isinstance(policy, (list, tuple)):
  99. return args.update({'x-ha-policy': 'nodes',
  100. 'x-ha-policy-params': list(policy)})
  101. args['x-ha-policy'] = policy
  102. def format(self, indent=0, indent_first=True):
  103. """Format routing table into string for log dumps."""
  104. active = self.consume_from
  105. if not active:
  106. return ''
  107. info = [QUEUE_FORMAT.strip().format(q)
  108. for _, q in sorted(items(active))]
  109. if indent_first:
  110. return textindent('\n'.join(info), indent)
  111. return info[0] + '\n' + textindent('\n'.join(info[1:]), indent)
  112. def select_add(self, queue, **kwargs):
  113. """Add new task queue that will be consumed from even when
  114. a subset has been selected using the :option:`-Q` option."""
  115. q = self.add(queue, **kwargs)
  116. if self._consume_from is not None:
  117. self._consume_from[q.name] = q
  118. return q
  119. def select(self, include):
  120. """Sets :attr:`consume_from` by selecting a subset of the
  121. currently defined queues.
  122. :param include: Names of queues to consume from.
  123. Can be iterable or string.
  124. """
  125. if include:
  126. self._consume_from = dict((name, self[name])
  127. for name in maybe_list(include))
  128. select_subset = select # XXX compat
  129. def deselect(self, exclude):
  130. """Deselect queues so that they will not be consumed from.
  131. :param exclude: Names of queues to avoid consuming from.
  132. Can be iterable or string.
  133. """
  134. if exclude:
  135. exclude = maybe_list(exclude)
  136. if self._consume_from is None:
  137. # using selection
  138. return self.select(k for k in self if k not in exclude)
  139. # using all queues
  140. for queue in exclude:
  141. self._consume_from.pop(queue, None)
  142. select_remove = deselect # XXX compat
  143. def new_missing(self, name):
  144. return Queue(name, self.autoexchange(name), name)
  145. @property
  146. def consume_from(self):
  147. if self._consume_from is not None:
  148. return self._consume_from
  149. return self
  150. class TaskProducer(Producer):
  151. app = None
  152. auto_declare = False
  153. retry = False
  154. retry_policy = None
  155. utc = True
  156. event_dispatcher = None
  157. send_sent_event = False
  158. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  159. self.retry = kwargs.pop('retry', self.retry)
  160. self.retry_policy = kwargs.pop('retry_policy',
  161. self.retry_policy or {})
  162. self.send_sent_event = kwargs.pop('send_sent_event',
  163. self.send_sent_event)
  164. exchange = exchange or self.exchange
  165. self.queues = self.app.amqp.queues # shortcut
  166. self.default_queue = self.app.amqp.default_queue
  167. super(TaskProducer, self).__init__(channel, exchange, *args, **kwargs)
  168. def publish_task(self, task_name, task_args=None, task_kwargs=None,
  169. countdown=None, eta=None, task_id=None, group_id=None,
  170. taskset_id=None, # compat alias to group_id
  171. expires=None, exchange=None, exchange_type=None,
  172. event_dispatcher=None, retry=None, retry_policy=None,
  173. queue=None, now=None, retries=0, chord=None,
  174. callbacks=None, errbacks=None, routing_key=None,
  175. serializer=None, delivery_mode=None, compression=None,
  176. reply_to=None, time_limit=None, soft_time_limit=None,
  177. declare=None, headers=None,
  178. send_before_publish=signals.before_task_publish.send,
  179. before_receivers=signals.before_task_publish.receivers,
  180. send_after_publish=signals.after_task_publish.send,
  181. after_receivers=signals.after_task_publish.receivers,
  182. send_task_sent=signals.task_sent.send, # XXX deprecated
  183. sent_receivers=signals.task_sent.receivers,
  184. **kwargs):
  185. """Send task message."""
  186. retry = self.retry if retry is None else retry
  187. qname = queue
  188. if queue is None and exchange is None:
  189. queue = self.default_queue
  190. if queue is not None:
  191. if isinstance(queue, string_t):
  192. qname, queue = queue, self.queues[queue]
  193. else:
  194. qname = queue.name
  195. exchange = exchange or queue.exchange.name
  196. routing_key = routing_key or queue.routing_key
  197. if declare is None and queue and not isinstance(queue, Broadcast):
  198. declare = [queue]
  199. # merge default and custom policy
  200. retry = self.retry if retry is None else retry
  201. _rp = (dict(self.retry_policy, **retry_policy) if retry_policy
  202. else self.retry_policy)
  203. task_id = task_id or uuid()
  204. task_args = task_args or []
  205. task_kwargs = task_kwargs or {}
  206. if not isinstance(task_args, (list, tuple)):
  207. raise ValueError('task args must be a list or tuple')
  208. if not isinstance(task_kwargs, dict):
  209. raise ValueError('task kwargs must be a dictionary')
  210. if countdown: # Convert countdown to ETA.
  211. now = now or self.app.now()
  212. eta = now + timedelta(seconds=countdown)
  213. if self.utc:
  214. eta = eta.replace(tzinfo=self.app.timezone)
  215. if isinstance(expires, (int, float)):
  216. now = now or self.app.now()
  217. expires = now + timedelta(seconds=expires)
  218. if self.utc:
  219. expires = expires.replace(tzinfo=self.app.timezone)
  220. eta = eta and eta.isoformat()
  221. expires = expires and expires.isoformat()
  222. body = {
  223. 'task': task_name,
  224. 'id': task_id,
  225. 'args': task_args,
  226. 'kwargs': task_kwargs,
  227. 'retries': retries or 0,
  228. 'eta': eta,
  229. 'expires': expires,
  230. 'utc': self.utc,
  231. 'callbacks': callbacks,
  232. 'errbacks': errbacks,
  233. 'timelimit': (time_limit, soft_time_limit),
  234. 'taskset': group_id or taskset_id,
  235. 'chord': chord,
  236. }
  237. if before_receivers:
  238. send_before_publish(
  239. sender=task_name, body=body,
  240. exchange=exchange,
  241. routing_key=routing_key,
  242. declare=declare,
  243. headers=headers,
  244. properties=kwargs,
  245. retry_policy=retry_policy,
  246. )
  247. self.publish(
  248. body,
  249. exchange=exchange, routing_key=routing_key,
  250. serializer=serializer or self.serializer,
  251. compression=compression or self.compression,
  252. headers=headers,
  253. retry=retry, retry_policy=_rp,
  254. reply_to=reply_to,
  255. correlation_id=task_id,
  256. delivery_mode=delivery_mode, declare=declare,
  257. **kwargs
  258. )
  259. if after_receivers:
  260. send_after_publish(sender=task_name, body=body,
  261. exchange=exchange, routing_key=routing_key)
  262. if sent_receivers: # XXX deprecated
  263. send_task_sent(sender=task_name, task_id=task_id,
  264. task=task_name, args=task_args,
  265. kwargs=task_kwargs, eta=eta,
  266. taskset=group_id or taskset_id)
  267. if self.send_sent_event:
  268. evd = event_dispatcher or self.event_dispatcher
  269. exname = exchange or self.exchange
  270. if isinstance(exname, Exchange):
  271. exname = exname.name
  272. evd.publish(
  273. 'task-sent',
  274. {
  275. 'uuid': task_id,
  276. 'name': task_name,
  277. 'args': safe_repr(task_args),
  278. 'kwargs': safe_repr(task_kwargs),
  279. 'retries': retries,
  280. 'eta': eta,
  281. 'expires': expires,
  282. 'queue': qname,
  283. 'exchange': exname,
  284. 'routing_key': routing_key,
  285. },
  286. self, retry=retry, retry_policy=retry_policy,
  287. )
  288. return task_id
  289. delay_task = publish_task # XXX Compat
  290. @cached_property
  291. def event_dispatcher(self):
  292. # We call Dispatcher.publish with a custom producer
  293. # so don't need the dispatcher to be "enabled".
  294. return self.app.events.Dispatcher(enabled=False)
  295. class TaskPublisher(TaskProducer):
  296. """Deprecated version of :class:`TaskProducer`."""
  297. def __init__(self, channel=None, exchange=None, *args, **kwargs):
  298. self.app = app_or_default(kwargs.pop('app', self.app))
  299. self.retry = kwargs.pop('retry', self.retry)
  300. self.retry_policy = kwargs.pop('retry_policy',
  301. self.retry_policy or {})
  302. exchange = exchange or self.exchange
  303. if not isinstance(exchange, Exchange):
  304. exchange = Exchange(exchange,
  305. kwargs.pop('exchange_type', 'direct'))
  306. self.queues = self.app.amqp.queues # shortcut
  307. super(TaskPublisher, self).__init__(channel, exchange, *args, **kwargs)
  308. class TaskConsumer(Consumer):
  309. app = None
  310. def __init__(self, channel, queues=None, app=None, accept=None, **kw):
  311. self.app = app or self.app
  312. if accept is None:
  313. accept = self.app.conf.CELERY_ACCEPT_CONTENT
  314. super(TaskConsumer, self).__init__(
  315. channel,
  316. queues or list(self.app.amqp.queues.consume_from.values()),
  317. accept=accept,
  318. **kw
  319. )
  320. class AMQP(object):
  321. Connection = Connection
  322. Consumer = Consumer
  323. #: compat alias to Connection
  324. BrokerConnection = Connection
  325. producer_cls = TaskProducer
  326. consumer_cls = TaskConsumer
  327. #: Cached and prepared routing table.
  328. _rtable = None
  329. #: Underlying producer pool instance automatically
  330. #: set by the :attr:`producer_pool`.
  331. _producer_pool = None
  332. # Exchange class/function used when defining automatic queues.
  333. # E.g. you can use ``autoexchange = lambda n: None`` to use the
  334. # amqp default exchange, which is a shortcut to bypass routing
  335. # and instead send directly to the queue named in the routing key.
  336. autoexchange = None
  337. def __init__(self, app):
  338. self.app = app
  339. def flush_routes(self):
  340. self._rtable = _routes.prepare(self.app.conf.CELERY_ROUTES)
  341. def Queues(self, queues, create_missing=None, ha_policy=None,
  342. autoexchange=None):
  343. """Create new :class:`Queues` instance, using queue defaults
  344. from the current configuration."""
  345. conf = self.app.conf
  346. if create_missing is None:
  347. create_missing = conf.CELERY_CREATE_MISSING_QUEUES
  348. if ha_policy is None:
  349. ha_policy = conf.CELERY_QUEUE_HA_POLICY
  350. if not queues and conf.CELERY_DEFAULT_QUEUE:
  351. queues = (Queue(conf.CELERY_DEFAULT_QUEUE,
  352. exchange=self.default_exchange,
  353. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY), )
  354. autoexchange = (self.autoexchange if autoexchange is None
  355. else autoexchange)
  356. return Queues(
  357. queues, self.default_exchange, create_missing,
  358. ha_policy, autoexchange,
  359. )
  360. def Router(self, queues=None, create_missing=None):
  361. """Return the current task router."""
  362. return _routes.Router(self.routes, queues or self.queues,
  363. self.app.either('CELERY_CREATE_MISSING_QUEUES',
  364. create_missing), app=self.app)
  365. @cached_property
  366. def TaskConsumer(self):
  367. """Return consumer configured to consume from the queues
  368. we are configured for (``app.amqp.queues.consume_from``)."""
  369. return self.app.subclass_with_self(self.consumer_cls,
  370. reverse='amqp.TaskConsumer')
  371. get_task_consumer = TaskConsumer # XXX compat
  372. @cached_property
  373. def TaskProducer(self):
  374. """Return publisher used to send tasks.
  375. You should use `app.send_task` instead.
  376. """
  377. conf = self.app.conf
  378. return self.app.subclass_with_self(
  379. self.producer_cls,
  380. reverse='amqp.TaskProducer',
  381. exchange=self.default_exchange,
  382. routing_key=conf.CELERY_DEFAULT_ROUTING_KEY,
  383. serializer=conf.CELERY_TASK_SERIALIZER,
  384. compression=conf.CELERY_MESSAGE_COMPRESSION,
  385. retry=conf.CELERY_TASK_PUBLISH_RETRY,
  386. retry_policy=conf.CELERY_TASK_PUBLISH_RETRY_POLICY,
  387. send_sent_event=conf.CELERY_SEND_TASK_SENT_EVENT,
  388. utc=conf.CELERY_ENABLE_UTC,
  389. )
  390. TaskPublisher = TaskProducer # compat
  391. @cached_property
  392. def default_queue(self):
  393. return self.queues[self.app.conf.CELERY_DEFAULT_QUEUE]
  394. @cached_property
  395. def queues(self):
  396. """Queue name⇒ declaration mapping."""
  397. return self.Queues(self.app.conf.CELERY_QUEUES)
  398. @queues.setter # noqa
  399. def queues(self, queues):
  400. return self.Queues(queues)
  401. @property
  402. def routes(self):
  403. if self._rtable is None:
  404. self.flush_routes()
  405. return self._rtable
  406. @cached_property
  407. def router(self):
  408. return self.Router()
  409. @property
  410. def producer_pool(self):
  411. if self._producer_pool is None:
  412. self._producer_pool = ProducerPool(
  413. self.app.pool,
  414. limit=self.app.pool.limit,
  415. Producer=self.TaskProducer,
  416. )
  417. return self._producer_pool
  418. publisher_pool = producer_pool # compat alias
  419. @cached_property
  420. def default_exchange(self):
  421. return Exchange(self.app.conf.CELERY_DEFAULT_EXCHANGE,
  422. self.app.conf.CELERY_DEFAULT_EXCHANGE_TYPE)