bootsteps.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.bootsteps
  4. ~~~~~~~~~~~~~~~~
  5. A directed acyclic graph of reusable components.
  6. """
  7. from __future__ import absolute_import, unicode_literals
  8. from collections import deque
  9. from threading import Event
  10. from kombu.common import ignore_errors
  11. from kombu.utils import symbol_by_name
  12. from .datastructures import DependencyGraph, GraphFormatter
  13. from .five import values, with_metaclass
  14. from .utils.imports import instantiate, qualname
  15. from .utils.log import get_logger
  16. from .utils.threads import default_socket_timeout
  17. try:
  18. from greenlet import GreenletExit
  19. IGNORE_ERRORS = (GreenletExit, )
  20. except ImportError: # pragma: no cover
  21. IGNORE_ERRORS = ()
  22. __all__ = ['Blueprint', 'Step', 'StartStopStep', 'ConsumerStep']
  23. #: Default socket timeout at shutdown.
  24. SHUTDOWN_SOCKET_TIMEOUT = 5.0
  25. #: States
  26. RUN = 0x1
  27. CLOSE = 0x2
  28. TERMINATE = 0x3
  29. logger = get_logger(__name__)
  30. debug = logger.debug
  31. def _pre(ns, fmt):
  32. return '| {0}: {1}'.format(ns.alias, fmt)
  33. def _label(s):
  34. return s.name.rsplit('.', 1)[-1]
  35. class StepFormatter(GraphFormatter):
  36. """Graph formatter for :class:`Blueprint`."""
  37. blueprint_prefix = '⧉'
  38. conditional_prefix = '∘'
  39. blueprint_scheme = {
  40. 'shape': 'parallelogram',
  41. 'color': 'slategray4',
  42. 'fillcolor': 'slategray3',
  43. }
  44. def label(self, step):
  45. return step and '{0}{1}'.format(
  46. self._get_prefix(step),
  47. (step.label or _label(step)).encode('utf-8', 'ignore'),
  48. )
  49. def _get_prefix(self, step):
  50. if step.last:
  51. return self.blueprint_prefix
  52. if step.conditional:
  53. return self.conditional_prefix
  54. return ''
  55. def node(self, obj, **attrs):
  56. scheme = self.blueprint_scheme if obj.last else self.node_scheme
  57. return self.draw_node(obj, scheme, attrs)
  58. def edge(self, a, b, **attrs):
  59. if a.last:
  60. attrs.update(arrowhead='none', color='darkseagreen3')
  61. return self.draw_edge(a, b, self.edge_scheme, attrs)
  62. class Blueprint(object):
  63. """Blueprint containing bootsteps that can be applied to objects.
  64. :keyword steps: List of steps.
  65. :keyword name: Set explicit name for this blueprint.
  66. :keyword app: Set the Celery app for this blueprint.
  67. :keyword on_start: Optional callback applied after blueprint start.
  68. :keyword on_close: Optional callback applied before blueprint close.
  69. :keyword on_stopped: Optional callback applied after blueprint stopped.
  70. """
  71. GraphFormatter = StepFormatter
  72. name = None
  73. state = None
  74. started = 0
  75. default_steps = set()
  76. state_to_name = {
  77. 0: 'initializing',
  78. RUN: 'running',
  79. CLOSE: 'closing',
  80. TERMINATE: 'terminating',
  81. }
  82. def __init__(self, steps=None, name=None, app=None,
  83. on_start=None, on_close=None, on_stopped=None):
  84. self.app = app
  85. self.name = name or self.name or qualname(type(self))
  86. self.types = set(steps or []) | set(self.default_steps)
  87. self.on_start = on_start
  88. self.on_close = on_close
  89. self.on_stopped = on_stopped
  90. self.shutdown_complete = Event()
  91. self.steps = {}
  92. def start(self, parent):
  93. self.state = RUN
  94. if self.on_start:
  95. self.on_start()
  96. for i, step in enumerate(s for s in parent.steps if s is not None):
  97. self._debug('Starting %s', step.alias)
  98. self.started = i + 1
  99. step.start(parent)
  100. debug('^-- substep ok')
  101. def human_state(self):
  102. return self.state_to_name[self.state or 0]
  103. def info(self, parent):
  104. info = {}
  105. for step in parent.steps:
  106. info.update(step.info(parent) or {})
  107. return info
  108. def close(self, parent):
  109. if self.on_close:
  110. self.on_close()
  111. self.send_all(parent, 'close', 'closing', reverse=False)
  112. def restart(self, parent, method='stop',
  113. description='restarting', propagate=False):
  114. self.send_all(parent, method, description, propagate=propagate)
  115. def send_all(self, parent, method,
  116. description=None, reverse=True, propagate=True, args=()):
  117. description = description or method.capitalize()
  118. steps = reversed(parent.steps) if reverse else parent.steps
  119. with default_socket_timeout(SHUTDOWN_SOCKET_TIMEOUT): # Issue 975
  120. for step in steps:
  121. if step:
  122. self._debug('%s %s...',
  123. description.capitalize(), step.alias)
  124. fun = getattr(step, method, None)
  125. if fun:
  126. try:
  127. fun(parent, *args)
  128. except Exception as exc:
  129. if propagate:
  130. raise
  131. logger.error(
  132. 'Error while %s %s: %r',
  133. description, step.alias, exc, exc_info=1,
  134. )
  135. def stop(self, parent, close=True, terminate=False):
  136. what = 'terminating' if terminate else 'stopping'
  137. if self.state in (CLOSE, TERMINATE):
  138. return
  139. if self.state != RUN or self.started != len(parent.steps):
  140. # Not fully started, can safely exit.
  141. self.state = TERMINATE
  142. self.shutdown_complete.set()
  143. return
  144. self.close(parent)
  145. self.state = CLOSE
  146. self.restart(
  147. parent, 'terminate' if terminate else 'stop',
  148. description=what, propagate=False,
  149. )
  150. if self.on_stopped:
  151. self.on_stopped()
  152. self.state = TERMINATE
  153. self.shutdown_complete.set()
  154. def join(self, timeout=None):
  155. try:
  156. # Will only get here if running green,
  157. # makes sure all greenthreads have exited.
  158. self.shutdown_complete.wait(timeout=timeout)
  159. except IGNORE_ERRORS:
  160. pass
  161. def apply(self, parent, **kwargs):
  162. """Apply the steps in this blueprint to an object.
  163. This will apply the ``__init__`` and ``include`` methods
  164. of each step, with the object as argument::
  165. step = Step(obj)
  166. ...
  167. step.include(obj)
  168. For :class:`StartStopStep` the services created
  169. will also be added to the objects ``steps`` attribute.
  170. """
  171. self._debug('Preparing bootsteps.')
  172. order = self.order = []
  173. steps = self.steps = self.claim_steps()
  174. self._debug('Building graph...')
  175. for S in self._finalize_steps(steps):
  176. step = S(parent, **kwargs)
  177. steps[step.name] = step
  178. order.append(step)
  179. self._debug('New boot order: {%s}',
  180. ', '.join(s.alias for s in self.order))
  181. for step in order:
  182. step.include(parent)
  183. return self
  184. def connect_with(self, other):
  185. self.graph.adjacent.update(other.graph.adjacent)
  186. self.graph.add_edge(type(other.order[0]), type(self.order[-1]))
  187. def __getitem__(self, name):
  188. return self.steps[name]
  189. def _find_last(self):
  190. return next((C for C in values(self.steps) if C.last), None)
  191. def _firstpass(self, steps):
  192. stream = deque(step.requires for step in values(steps))
  193. while stream:
  194. for node in stream.popleft():
  195. node = symbol_by_name(node)
  196. if node.name not in self.steps:
  197. steps[node.name] = node
  198. stream.append(node.requires)
  199. def _finalize_steps(self, steps):
  200. last = self._find_last()
  201. self._firstpass(steps)
  202. it = ((C, C.requires) for C in values(steps))
  203. G = self.graph = DependencyGraph(
  204. it, formatter=self.GraphFormatter(root=last),
  205. )
  206. if last:
  207. for obj in G:
  208. if obj != last:
  209. G.add_edge(last, obj)
  210. try:
  211. return G.topsort()
  212. except KeyError as exc:
  213. raise KeyError('unknown bootstep: %s' % exc)
  214. def claim_steps(self):
  215. return dict(self.load_step(step) for step in self._all_steps())
  216. def _all_steps(self):
  217. return self.types | self.app.steps[self.name.lower()]
  218. def load_step(self, step):
  219. step = symbol_by_name(step)
  220. return step.name, step
  221. def _debug(self, msg, *args):
  222. return debug(_pre(self, msg), *args)
  223. @property
  224. def alias(self):
  225. return _label(self)
  226. class StepType(type):
  227. """Metaclass for steps."""
  228. def __new__(cls, name, bases, attrs):
  229. module = attrs.get('__module__')
  230. qname = '{0}.{1}'.format(module, name) if module else name
  231. attrs.update(
  232. __qualname__=qname,
  233. name=attrs.get('name') or qname,
  234. requires=attrs.get('requires', ()),
  235. )
  236. return super(StepType, cls).__new__(cls, name, bases, attrs)
  237. def __str__(self):
  238. return self.name
  239. def __repr__(self):
  240. return 'step:{0.name}{{{0.requires!r}}}'.format(self)
  241. @with_metaclass(StepType)
  242. class Step(object):
  243. """A Bootstep.
  244. The :meth:`__init__` method is called when the step
  245. is bound to a parent object, and can as such be used
  246. to initialize attributes in the parent object at
  247. parent instantiation-time.
  248. """
  249. #: Optional step name, will use qualname if not specified.
  250. name = None
  251. #: Optional short name used for graph outputs and in logs.
  252. label = None
  253. #: Set this to true if the step is enabled based on some condition.
  254. conditional = False
  255. #: List of other steps that that must be started before this step.
  256. #: Note that all dependencies must be in the same blueprint.
  257. requires = ()
  258. #: This flag is reserved for the workers Consumer,
  259. #: since it is required to always be started last.
  260. #: There can only be one object marked last
  261. #: in every blueprint.
  262. last = False
  263. #: This provides the default for :meth:`include_if`.
  264. enabled = True
  265. def __init__(self, parent, **kwargs):
  266. pass
  267. def include_if(self, parent):
  268. """An optional predicate that decides whether this
  269. step should be created."""
  270. return self.enabled
  271. def instantiate(self, name, *args, **kwargs):
  272. return instantiate(name, *args, **kwargs)
  273. def _should_include(self, parent):
  274. if self.include_if(parent):
  275. return True, self.create(parent)
  276. return False, None
  277. def include(self, parent):
  278. return self._should_include(parent)[0]
  279. def create(self, parent):
  280. """Create the step."""
  281. pass
  282. def __repr__(self):
  283. return '<step: {0.alias}>'.format(self)
  284. @property
  285. def alias(self):
  286. return self.label or _label(self)
  287. def info(self, obj):
  288. pass
  289. class StartStopStep(Step):
  290. #: Optional obj created by the :meth:`create` method.
  291. #: This is used by :class:`StartStopStep` to keep the
  292. #: original service object.
  293. obj = None
  294. def start(self, parent):
  295. if self.obj:
  296. return self.obj.start()
  297. def stop(self, parent):
  298. if self.obj:
  299. return self.obj.stop()
  300. def close(self, parent):
  301. pass
  302. def terminate(self, parent):
  303. if self.obj:
  304. return getattr(self.obj, 'terminate', self.obj.stop)()
  305. def include(self, parent):
  306. inc, ret = self._should_include(parent)
  307. if inc:
  308. self.obj = ret
  309. parent.steps.append(self)
  310. return inc
  311. class ConsumerStep(StartStopStep):
  312. requires = ('Connection', )
  313. consumers = None
  314. def get_consumers(self, channel):
  315. raise NotImplementedError('missing get_consumers')
  316. def start(self, c):
  317. channel = c.connection.channel()
  318. self.consumers = self.get_consumers(channel)
  319. for consumer in self.consumers or []:
  320. consumer.consume()
  321. def stop(self, c):
  322. channels = set()
  323. for consumer in self.consumers or []:
  324. ignore_errors(c.connection, consumer.cancel)
  325. if consumer.channel:
  326. channels.add(consumer.channel)
  327. for channel in channels:
  328. ignore_errors(c.connection, channel.close)
  329. shutdown = stop