callback.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. """Callback management class, common area for keeping track of all callbacks in
  2. the Pika stack.
  3. """
  4. import functools
  5. import logging
  6. from pika import frame
  7. from pika import amqp_object
  8. from pika.compat import xrange, canonical_str
  9. LOGGER = logging.getLogger(__name__)
  10. def name_or_value(value):
  11. """Will take Frame objects, classes, etc and attempt to return a valid
  12. string identifier for them.
  13. :param pika.amqp_object.AMQPObject|pika.frame.Frame|int|str value: The
  14. value to sanitize
  15. :rtype: str
  16. """
  17. # Is it subclass of AMQPObject
  18. try:
  19. if issubclass(value, amqp_object.AMQPObject):
  20. return value.NAME
  21. except TypeError:
  22. pass
  23. # Is it a Pika frame object?
  24. if isinstance(value, frame.Method):
  25. return value.method.NAME
  26. # Is it a Pika frame object (go after Method since Method extends this)
  27. if isinstance(value, amqp_object.AMQPObject):
  28. return value.NAME
  29. # Cast the value to a str (python 2 and python 3); encoding as UTF-8 on Python 2
  30. return canonical_str(value)
  31. def sanitize_prefix(function):
  32. """Automatically call name_or_value on the prefix passed in."""
  33. @functools.wraps(function)
  34. def wrapper(*args, **kwargs):
  35. args = list(args)
  36. offset = 1
  37. if 'prefix' in kwargs:
  38. kwargs['prefix'] = name_or_value(kwargs['prefix'])
  39. elif len(args) - 1 >= offset:
  40. args[offset] = name_or_value(args[offset])
  41. offset += 1
  42. if 'key' in kwargs:
  43. kwargs['key'] = name_or_value(kwargs['key'])
  44. elif len(args) - 1 >= offset:
  45. args[offset] = name_or_value(args[offset])
  46. return function(*tuple(args), **kwargs)
  47. return wrapper
  48. def check_for_prefix_and_key(function):
  49. """Automatically return false if the key or prefix is not in the callbacks
  50. for the instance.
  51. """
  52. @functools.wraps(function)
  53. def wrapper(*args, **kwargs):
  54. offset = 1
  55. # Sanitize the prefix
  56. if 'prefix' in kwargs:
  57. prefix = name_or_value(kwargs['prefix'])
  58. else:
  59. prefix = name_or_value(args[offset])
  60. offset += 1
  61. # Make sure to sanitize the key as well
  62. if 'key' in kwargs:
  63. key = name_or_value(kwargs['key'])
  64. else:
  65. key = name_or_value(args[offset])
  66. # Make sure prefix and key are in the stack
  67. if prefix not in args[0]._stack or key not in args[0]._stack[prefix]: # pylint: disable=W0212
  68. return False
  69. # Execute the method
  70. return function(*args, **kwargs)
  71. return wrapper
  72. class CallbackManager(object):
  73. """CallbackManager is a global callback system designed to be a single place
  74. where Pika can manage callbacks and process them. It should be referenced
  75. by the CallbackManager.instance() method instead of constructing new
  76. instances of it.
  77. """
  78. CALLS = 'calls'
  79. ARGUMENTS = 'arguments'
  80. DUPLICATE_WARNING = 'Duplicate callback found for "%s:%s"'
  81. CALLBACK = 'callback'
  82. ONE_SHOT = 'one_shot'
  83. ONLY_CALLER = 'only'
  84. def __init__(self):
  85. """Create an instance of the CallbackManager"""
  86. self._stack = dict()
  87. @sanitize_prefix
  88. def add(self,
  89. prefix,
  90. key,
  91. callback,
  92. one_shot=True,
  93. only_caller=None,
  94. arguments=None):
  95. """Add a callback to the stack for the specified key. If the call is
  96. specified as one_shot, it will be removed after being fired
  97. The prefix is usually the channel number but the class is generic
  98. and prefix and key may be any value. If you pass in only_caller
  99. CallbackManager will restrict processing of the callback to only
  100. the calling function/object that you specify.
  101. :param str|int prefix: Categorize the callback
  102. :param str|dict key: The key for the callback
  103. :param callable callback: The callback to call
  104. :param bool one_shot: Remove this callback after it is called
  105. :param object only_caller: Only allow one_caller value to call the
  106. event that fires the callback.
  107. :param dict arguments: Arguments to validate when processing
  108. :rtype: tuple(prefix, key)
  109. """
  110. # Prep the stack
  111. if prefix not in self._stack:
  112. self._stack[prefix] = dict()
  113. if key not in self._stack[prefix]:
  114. self._stack[prefix][key] = list()
  115. # Check for a duplicate
  116. for callback_dict in self._stack[prefix][key]:
  117. if (callback_dict[self.CALLBACK] == callback and
  118. callback_dict[self.ARGUMENTS] == arguments and
  119. callback_dict[self.ONLY_CALLER] == only_caller):
  120. if callback_dict[self.ONE_SHOT] is True:
  121. callback_dict[self.CALLS] += 1
  122. LOGGER.debug('Incremented callback reference counter: %r',
  123. callback_dict)
  124. else:
  125. LOGGER.warning(self.DUPLICATE_WARNING, prefix, key)
  126. return prefix, key
  127. # Create the callback dictionary
  128. callback_dict = self._callback_dict(callback, one_shot, only_caller,
  129. arguments)
  130. self._stack[prefix][key].append(callback_dict)
  131. LOGGER.debug('Added: %r', callback_dict)
  132. return prefix, key
  133. def clear(self):
  134. """Clear all the callbacks if there are any defined."""
  135. self._stack = dict()
  136. LOGGER.debug('Callbacks cleared')
  137. @sanitize_prefix
  138. def cleanup(self, prefix):
  139. """Remove all callbacks from the stack by a prefix. Returns True
  140. if keys were there to be removed
  141. :param str or int prefix: The prefix for keeping track of callbacks with
  142. :rtype: bool
  143. """
  144. LOGGER.debug('Clearing out %r from the stack', prefix)
  145. if prefix not in self._stack or not self._stack[prefix]:
  146. return False
  147. del self._stack[prefix]
  148. return True
  149. @sanitize_prefix
  150. def pending(self, prefix, key):
  151. """Return count of callbacks for a given prefix or key or None
  152. :param str|int prefix: Categorize the callback
  153. :param object|str|dict key: The key for the callback
  154. :rtype: None or int
  155. """
  156. if not prefix in self._stack or not key in self._stack[prefix]:
  157. return None
  158. return len(self._stack[prefix][key])
  159. @sanitize_prefix
  160. @check_for_prefix_and_key
  161. def process(self, prefix, key, caller, *args, **keywords):
  162. """Run through and process all the callbacks for the specified keys.
  163. Caller should be specified at all times so that callbacks which
  164. require a specific function to call CallbackManager.process will
  165. not be processed.
  166. :param str|int prefix: Categorize the callback
  167. :param object|str|int key: The key for the callback
  168. :param object caller: Who is firing the event
  169. :param list args: Any optional arguments
  170. :param dict keywords: Optional keyword arguments
  171. :rtype: bool
  172. """
  173. LOGGER.debug('Processing %s:%s', prefix, key)
  174. if prefix not in self._stack or key not in self._stack[prefix]:
  175. return False
  176. callbacks = list()
  177. # Check each callback, append it to the list if it should be called
  178. for callback_dict in list(self._stack[prefix][key]):
  179. if self._should_process_callback(callback_dict, caller, list(args)):
  180. callbacks.append(callback_dict[self.CALLBACK])
  181. if callback_dict[self.ONE_SHOT]:
  182. self._use_one_shot_callback(prefix, key, callback_dict)
  183. # Call each callback
  184. for callback in callbacks:
  185. LOGGER.debug('Calling %s for "%s:%s"', callback, prefix, key)
  186. try:
  187. callback(*args, **keywords)
  188. except:
  189. LOGGER.exception('Calling %s for "%s:%s" failed', callback,
  190. prefix, key)
  191. raise
  192. return True
  193. @sanitize_prefix
  194. @check_for_prefix_and_key
  195. def remove(self, prefix, key, callback_value=None, arguments=None):
  196. """Remove a callback from the stack by prefix, key and optionally
  197. the callback itself. If you only pass in prefix and key, all
  198. callbacks for that prefix and key will be removed.
  199. :param str or int prefix: The prefix for keeping track of callbacks with
  200. :param str key: The callback key
  201. :param callable callback_value: The method defined to call on callback
  202. :param dict arguments: Optional arguments to check
  203. :rtype: bool
  204. """
  205. if callback_value:
  206. offsets_to_remove = list()
  207. for offset in xrange(len(self._stack[prefix][key]), 0, -1):
  208. callback_dict = self._stack[prefix][key][offset - 1]
  209. if (callback_dict[self.CALLBACK] == callback_value and
  210. self._arguments_match(callback_dict, [arguments])):
  211. offsets_to_remove.append(offset - 1)
  212. for offset in offsets_to_remove:
  213. try:
  214. LOGGER.debug('Removing callback #%i: %r', offset,
  215. self._stack[prefix][key][offset])
  216. del self._stack[prefix][key][offset]
  217. except KeyError:
  218. pass
  219. self._cleanup_callback_dict(prefix, key)
  220. return True
  221. @sanitize_prefix
  222. @check_for_prefix_and_key
  223. def remove_all(self, prefix, key):
  224. """Remove all callbacks for the specified prefix and key.
  225. :param str prefix: The prefix for keeping track of callbacks with
  226. :param str key: The callback key
  227. """
  228. del self._stack[prefix][key]
  229. self._cleanup_callback_dict(prefix, key)
  230. def _arguments_match(self, callback_dict, args):
  231. """Validate if the arguments passed in match the expected arguments in
  232. the callback_dict. We expect this to be a frame passed in to *args for
  233. process or passed in as a list from remove.
  234. :param dict callback_dict: The callback dictionary to evaluate against
  235. :param list args: The arguments passed in as a list
  236. """
  237. if callback_dict[self.ARGUMENTS] is None:
  238. return True
  239. if not args:
  240. return False
  241. if isinstance(args[0], dict):
  242. return self._dict_arguments_match(args[0],
  243. callback_dict[self.ARGUMENTS])
  244. return self._obj_arguments_match(
  245. args[0].method if hasattr(args[0], 'method') else args[0],
  246. callback_dict[self.ARGUMENTS])
  247. def _callback_dict(self, callback, one_shot, only_caller, arguments):
  248. """Return the callback dictionary.
  249. :param callable callback: The callback to call
  250. :param bool one_shot: Remove this callback after it is called
  251. :param object only_caller: Only allow one_caller value to call the
  252. event that fires the callback.
  253. :rtype: dict
  254. """
  255. value = {
  256. self.CALLBACK: callback,
  257. self.ONE_SHOT: one_shot,
  258. self.ONLY_CALLER: only_caller,
  259. self.ARGUMENTS: arguments
  260. }
  261. if one_shot:
  262. value[self.CALLS] = 1
  263. return value
  264. def _cleanup_callback_dict(self, prefix, key=None):
  265. """Remove empty dict nodes in the callback stack.
  266. :param str or int prefix: The prefix for keeping track of callbacks with
  267. :param str key: The callback key
  268. """
  269. if key and key in self._stack[prefix] and not self._stack[prefix][key]:
  270. del self._stack[prefix][key]
  271. if prefix in self._stack and not self._stack[prefix]:
  272. del self._stack[prefix]
  273. @staticmethod
  274. def _dict_arguments_match(value, expectation):
  275. """Checks an dict to see if it has attributes that meet the expectation.
  276. :param dict value: The dict to evaluate
  277. :param dict expectation: The values to check against
  278. :rtype: bool
  279. """
  280. LOGGER.debug('Comparing %r to %r', value, expectation)
  281. for key in expectation:
  282. if value.get(key) != expectation[key]:
  283. LOGGER.debug('Values in dict do not match for %s', key)
  284. return False
  285. return True
  286. @staticmethod
  287. def _obj_arguments_match(value, expectation):
  288. """Checks an object to see if it has attributes that meet the
  289. expectation.
  290. :param object value: The object to evaluate
  291. :param dict expectation: The values to check against
  292. :rtype: bool
  293. """
  294. for key in expectation:
  295. if not hasattr(value, key):
  296. LOGGER.debug('%r does not have required attribute: %s',
  297. type(value), key)
  298. return False
  299. if getattr(value, key) != expectation[key]:
  300. LOGGER.debug('Values in %s do not match for %s', type(value),
  301. key)
  302. return False
  303. return True
  304. def _should_process_callback(self, callback_dict, caller, args):
  305. """Returns True if the callback should be processed.
  306. :param dict callback_dict: The callback configuration
  307. :param object caller: Who is firing the event
  308. :param list args: Any optional arguments
  309. :rtype: bool
  310. """
  311. if not self._arguments_match(callback_dict, args):
  312. LOGGER.debug('Arguments do not match for %r, %r', callback_dict,
  313. args)
  314. return False
  315. return (callback_dict[self.ONLY_CALLER] is None or
  316. (callback_dict[self.ONLY_CALLER] and
  317. callback_dict[self.ONLY_CALLER] == caller))
  318. def _use_one_shot_callback(self, prefix, key, callback_dict):
  319. """Process the one-shot callback, decrementing the use counter and
  320. removing it from the stack if it's now been fully used.
  321. :param str or int prefix: The prefix for keeping track of callbacks with
  322. :param str key: The callback key
  323. :param dict callback_dict: The callback dict to process
  324. """
  325. LOGGER.debug('Processing use of oneshot callback')
  326. callback_dict[self.CALLS] -= 1
  327. LOGGER.debug('%i registered uses left', callback_dict[self.CALLS])
  328. if callback_dict[self.CALLS] <= 0:
  329. self.remove(prefix, key, callback_dict[self.CALLBACK],
  330. callback_dict[self.ARGUMENTS])