amqp.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery amqp` command.
  4. .. program:: celery amqp
  5. """
  6. from __future__ import absolute_import, print_function
  7. import cmd
  8. import sys
  9. import shlex
  10. import pprint
  11. from collections import Callable
  12. from functools import partial
  13. from itertools import count
  14. from amqp import Message
  15. from celery.utils.functional import padlist
  16. from celery.bin.base import Command
  17. from celery.five import string_t
  18. from celery.utils import strtobool
  19. __all__ = ['AMQPAdmin', 'AMQShell', 'Spec', 'amqp']
  20. # Map to coerce strings to other types.
  21. COERCE = {bool: strtobool}
  22. HELP_HEADER = """
  23. Commands
  24. --------
  25. """.rstrip()
  26. EXAMPLE_TEXT = """
  27. Example:
  28. -> queue.delete myqueue yes no
  29. """
  30. say = partial(print, file=sys.stderr)
  31. class Spec(object):
  32. """AMQP Command specification.
  33. Used to convert arguments to Python values and display various help
  34. and tooltips.
  35. :param args: see :attr:`args`.
  36. :keyword returns: see :attr:`returns`.
  37. .. attribute args::
  38. List of arguments this command takes. Should
  39. contain `(argument_name, argument_type)` tuples.
  40. .. attribute returns:
  41. Helpful human string representation of what this command returns.
  42. May be :const:`None`, to signify the return type is unknown.
  43. """
  44. def __init__(self, *args, **kwargs):
  45. self.args = args
  46. self.returns = kwargs.get('returns')
  47. def coerce(self, index, value):
  48. """Coerce value for argument at index.
  49. E.g. if :attr:`args` is `[('is_active', bool)]`:
  50. >>> coerce(0, 'False')
  51. False
  52. """
  53. arg_info = self.args[index]
  54. arg_type = arg_info[1]
  55. # Might be a custom way to coerce the string value,
  56. # so look in the coercion map.
  57. return COERCE.get(arg_type, arg_type)(value)
  58. def str_args_to_python(self, arglist):
  59. """Process list of string arguments to values according to spec.
  60. e.g:
  61. >>> spec = Spec([('queue', str), ('if_unused', bool)])
  62. >>> spec.str_args_to_python('pobox', 'true')
  63. ('pobox', True)
  64. """
  65. return tuple(
  66. self.coerce(index, value) for index, value in enumerate(arglist))
  67. def format_response(self, response):
  68. """Format the return value of this command in a human-friendly way."""
  69. if not self.returns:
  70. return 'ok.' if response is None else response
  71. if isinstance(self.returns, Callable):
  72. return self.returns(response)
  73. return self.returns.format(response)
  74. def format_arg(self, name, type, default_value=None):
  75. if default_value is not None:
  76. return '{0}:{1}'.format(name, default_value)
  77. return name
  78. def format_signature(self):
  79. return ' '.join(self.format_arg(*padlist(list(arg), 3))
  80. for arg in self.args)
  81. def dump_message(message):
  82. if message is None:
  83. return 'No messages in queue. basic.publish something.'
  84. return {'body': message.body,
  85. 'properties': message.properties,
  86. 'delivery_info': message.delivery_info}
  87. def format_declare_queue(ret):
  88. return 'ok. queue:{0} messages:{1} consumers:{2}.'.format(*ret)
  89. class AMQShell(cmd.Cmd):
  90. """AMQP API Shell.
  91. :keyword connect: Function used to connect to the server, must return
  92. connection object.
  93. :keyword silent: If :const:`True`, the commands won't have annoying
  94. output not relevant when running in non-shell mode.
  95. .. attribute: builtins
  96. Mapping of built-in command names -> method names
  97. .. attribute:: amqp
  98. Mapping of AMQP API commands and their :class:`Spec`.
  99. """
  100. conn = None
  101. chan = None
  102. prompt_fmt = '{self.counter}> '
  103. identchars = cmd.IDENTCHARS = '.'
  104. needs_reconnect = False
  105. counter = 1
  106. inc_counter = count(2)
  107. builtins = {'EOF': 'do_exit',
  108. 'exit': 'do_exit',
  109. 'help': 'do_help'}
  110. amqp = {
  111. 'exchange.declare': Spec(('exchange', str),
  112. ('type', str),
  113. ('passive', bool, 'no'),
  114. ('durable', bool, 'no'),
  115. ('auto_delete', bool, 'no'),
  116. ('internal', bool, 'no')),
  117. 'exchange.delete': Spec(('exchange', str),
  118. ('if_unused', bool)),
  119. 'queue.bind': Spec(('queue', str),
  120. ('exchange', str),
  121. ('routing_key', str)),
  122. 'queue.declare': Spec(('queue', str),
  123. ('passive', bool, 'no'),
  124. ('durable', bool, 'no'),
  125. ('exclusive', bool, 'no'),
  126. ('auto_delete', bool, 'no'),
  127. returns=format_declare_queue),
  128. 'queue.delete': Spec(('queue', str),
  129. ('if_unused', bool, 'no'),
  130. ('if_empty', bool, 'no'),
  131. returns='ok. {0} messages deleted.'),
  132. 'queue.purge': Spec(('queue', str),
  133. returns='ok. {0} messages deleted.'),
  134. 'basic.get': Spec(('queue', str),
  135. ('no_ack', bool, 'off'),
  136. returns=dump_message),
  137. 'basic.publish': Spec(('msg', Message),
  138. ('exchange', str),
  139. ('routing_key', str),
  140. ('mandatory', bool, 'no'),
  141. ('immediate', bool, 'no')),
  142. 'basic.ack': Spec(('delivery_tag', int)),
  143. }
  144. def __init__(self, *args, **kwargs):
  145. self.connect = kwargs.pop('connect')
  146. self.silent = kwargs.pop('silent', False)
  147. self.out = kwargs.pop('out', sys.stderr)
  148. cmd.Cmd.__init__(self, *args, **kwargs)
  149. self._reconnect()
  150. def note(self, m):
  151. """Say something to the user. Disabled if :attr:`silent`."""
  152. if not self.silent:
  153. say(m, file=self.out)
  154. def say(self, m):
  155. say(m, file=self.out)
  156. def get_amqp_api_command(self, cmd, arglist):
  157. """With a command name and a list of arguments, convert the arguments
  158. to Python values and find the corresponding method on the AMQP channel
  159. object.
  160. :returns: tuple of `(method, processed_args)`.
  161. Example:
  162. >>> get_amqp_api_command('queue.delete', ['pobox', 'yes', 'no'])
  163. (<bound method Channel.queue_delete of
  164. <amqp.channel.Channel object at 0x...>>,
  165. ('testfoo', True, False))
  166. """
  167. spec = self.amqp[cmd]
  168. args = spec.str_args_to_python(arglist)
  169. attr_name = cmd.replace('.', '_')
  170. if self.needs_reconnect:
  171. self._reconnect()
  172. return getattr(self.chan, attr_name), args, spec.format_response
  173. def do_exit(self, *args):
  174. """The `'exit'` command."""
  175. self.note("\n-> please, don't leave!")
  176. sys.exit(0)
  177. def display_command_help(self, cmd, short=False):
  178. spec = self.amqp[cmd]
  179. self.say('{0} {1}'.format(cmd, spec.format_signature()))
  180. def do_help(self, *args):
  181. if not args:
  182. self.say(HELP_HEADER)
  183. for cmd_name in self.amqp:
  184. self.display_command_help(cmd_name, short=True)
  185. self.say(EXAMPLE_TEXT)
  186. else:
  187. self.display_command_help(args[0])
  188. def default(self, line):
  189. self.say("unknown syntax: {0!r}. how about some 'help'?".format(line))
  190. def get_names(self):
  191. return set(self.builtins) | set(self.amqp)
  192. def completenames(self, text, *ignored):
  193. """Return all commands starting with `text`, for tab-completion."""
  194. names = self.get_names()
  195. first = [cmd for cmd in names
  196. if cmd.startswith(text.replace('_', '.'))]
  197. if first:
  198. return first
  199. return [cmd for cmd in names
  200. if cmd.partition('.')[2].startswith(text)]
  201. def dispatch(self, cmd, argline):
  202. """Dispatch and execute the command.
  203. Lookup order is: :attr:`builtins` -> :attr:`amqp`.
  204. """
  205. arglist = shlex.split(argline)
  206. if cmd in self.builtins:
  207. return getattr(self, self.builtins[cmd])(*arglist)
  208. fun, args, formatter = self.get_amqp_api_command(cmd, arglist)
  209. return formatter(fun(*args))
  210. def parseline(self, line):
  211. """Parse input line.
  212. :returns: tuple of three items:
  213. `(command_name, arglist, original_line)`
  214. E.g::
  215. >>> parseline('queue.delete A 'B' C')
  216. ('queue.delete', 'A 'B' C', 'queue.delete A 'B' C')
  217. """
  218. parts = line.split()
  219. if parts:
  220. return parts[0], ' '.join(parts[1:]), line
  221. return '', '', line
  222. def onecmd(self, line):
  223. """Parse line and execute command."""
  224. cmd, arg, line = self.parseline(line)
  225. if not line:
  226. return self.emptyline()
  227. self.lastcmd = line
  228. self.counter = next(self.inc_counter)
  229. try:
  230. self.respond(self.dispatch(cmd, arg))
  231. except (AttributeError, KeyError) as exc:
  232. self.default(line)
  233. except Exception as exc:
  234. self.say(exc)
  235. self.needs_reconnect = True
  236. def respond(self, retval):
  237. """What to do with the return value of a command."""
  238. if retval is not None:
  239. if isinstance(retval, string_t):
  240. self.say(retval)
  241. else:
  242. self.say(pprint.pformat(retval))
  243. def _reconnect(self):
  244. """Re-establish connection to the AMQP server."""
  245. self.conn = self.connect(self.conn)
  246. self.chan = self.conn.default_channel
  247. self.needs_reconnect = False
  248. @property
  249. def prompt(self):
  250. return self.prompt_fmt.format(self=self)
  251. class AMQPAdmin(object):
  252. """The celery :program:`celery amqp` utility."""
  253. Shell = AMQShell
  254. def __init__(self, *args, **kwargs):
  255. self.app = kwargs['app']
  256. self.out = kwargs.setdefault('out', sys.stderr)
  257. self.silent = kwargs.get('silent')
  258. self.args = args
  259. def connect(self, conn=None):
  260. if conn:
  261. conn.close()
  262. conn = self.app.connection()
  263. self.note('-> connecting to {0}.'.format(conn.as_uri()))
  264. conn.connect()
  265. self.note('-> connected.')
  266. return conn
  267. def run(self):
  268. shell = self.Shell(connect=self.connect, out=self.out)
  269. if self.args:
  270. return shell.onecmd(' '.join(self.args))
  271. try:
  272. return shell.cmdloop()
  273. except KeyboardInterrupt:
  274. self.note('(bibi)')
  275. pass
  276. def note(self, m):
  277. if not self.silent:
  278. say(m, file=self.out)
  279. class amqp(Command):
  280. """AMQP Administration Shell.
  281. Also works for non-amqp transports (but not ones that
  282. store declarations in memory).
  283. Examples::
  284. celery amqp
  285. start shell mode
  286. celery amqp help
  287. show list of commands
  288. celery amqp exchange.delete name
  289. celery amqp queue.delete queue
  290. celery amqp queue.delete queue yes yes
  291. """
  292. def run(self, *args, **options):
  293. options['app'] = self.app
  294. return AMQPAdmin(*args, **options).run()
  295. def main():
  296. amqp().execute_from_commandline()
  297. if __name__ == '__main__': # pragma: no cover
  298. main()