components.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.components
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. Default worker bootsteps.
  6. """
  7. from __future__ import absolute_import
  8. import atexit
  9. import warnings
  10. from kombu.async import Hub as _Hub, get_event_loop, set_event_loop
  11. from kombu.async.semaphore import DummyLock, LaxBoundedSemaphore
  12. from kombu.async.timer import Timer as _Timer
  13. from celery import bootsteps
  14. from celery.exceptions import ImproperlyConfigured
  15. from celery.five import string_t
  16. from celery.utils.log import worker_logger as logger
  17. __all__ = ['Timer', 'Hub', 'Queues', 'Pool', 'Beat', 'StateDB', 'Consumer']
  18. ERR_B_GREEN = """\
  19. -B option doesn't work with eventlet/gevent pools: \
  20. use standalone beat instead.\
  21. """
  22. W_POOL_SETTING = """
  23. The CELERYD_POOL setting should not be used to select the eventlet/gevent
  24. pools, instead you *must use the -P* argument so that patches are applied
  25. as early as possible.
  26. """
  27. class Timer(bootsteps.Step):
  28. """This step initializes the internal timer used by the worker."""
  29. def create(self, w):
  30. if w.use_eventloop:
  31. # does not use dedicated timer thread.
  32. w.timer = _Timer(max_interval=10.0)
  33. else:
  34. if not w.timer_cls:
  35. # Default Timer is set by the pool, as e.g. eventlet
  36. # needs a custom implementation.
  37. w.timer_cls = w.pool_cls.Timer
  38. w.timer = self.instantiate(w.timer_cls,
  39. max_interval=w.timer_precision,
  40. on_timer_error=self.on_timer_error,
  41. on_timer_tick=self.on_timer_tick)
  42. def on_timer_error(self, exc):
  43. logger.error('Timer error: %r', exc, exc_info=True)
  44. def on_timer_tick(self, delay):
  45. logger.debug('Timer wake-up! Next eta %s secs.', delay)
  46. class Hub(bootsteps.StartStopStep):
  47. requires = (Timer, )
  48. def __init__(self, w, **kwargs):
  49. w.hub = None
  50. def include_if(self, w):
  51. return w.use_eventloop
  52. def create(self, w):
  53. w.hub = get_event_loop()
  54. if w.hub is None:
  55. w.hub = set_event_loop(_Hub(w.timer))
  56. self._patch_thread_primitives(w)
  57. return self
  58. def start(self, w):
  59. pass
  60. def stop(self, w):
  61. w.hub.close()
  62. def terminate(self, w):
  63. w.hub.close()
  64. def _patch_thread_primitives(self, w):
  65. # make clock use dummy lock
  66. w.app.clock.lock = DummyLock()
  67. # multiprocessing's ApplyResult uses this lock.
  68. try:
  69. from billiard import pool
  70. except ImportError:
  71. pass
  72. else:
  73. pool.Lock = DummyLock
  74. class Queues(bootsteps.Step):
  75. """This bootstep initializes the internal queues
  76. used by the worker."""
  77. label = 'Queues (intra)'
  78. requires = (Hub, )
  79. def create(self, w):
  80. w.process_task = w._process_task
  81. if w.use_eventloop:
  82. if w.pool_putlocks and w.pool_cls.uses_semaphore:
  83. w.process_task = w._process_task_sem
  84. class Pool(bootsteps.StartStopStep):
  85. """Bootstep managing the worker pool.
  86. Describes how to initialize the worker pool, and starts and stops
  87. the pool during worker startup/shutdown.
  88. Adds attributes:
  89. * autoscale
  90. * pool
  91. * max_concurrency
  92. * min_concurrency
  93. """
  94. requires = (Queues, )
  95. def __init__(self, w, autoscale=None, autoreload=None,
  96. no_execv=False, optimization=None, **kwargs):
  97. if isinstance(autoscale, string_t):
  98. max_c, _, min_c = autoscale.partition(',')
  99. autoscale = [int(max_c), min_c and int(min_c) or 0]
  100. w.autoscale = autoscale
  101. w.pool = None
  102. w.max_concurrency = None
  103. w.min_concurrency = w.concurrency
  104. w.no_execv = no_execv
  105. if w.autoscale:
  106. w.max_concurrency, w.min_concurrency = w.autoscale
  107. self.autoreload_enabled = autoreload
  108. self.optimization = optimization
  109. def close(self, w):
  110. if w.pool:
  111. w.pool.close()
  112. def terminate(self, w):
  113. if w.pool:
  114. w.pool.terminate()
  115. def create(self, w, semaphore=None, max_restarts=None):
  116. if w.app.conf.CELERYD_POOL in ('eventlet', 'gevent'):
  117. warnings.warn(UserWarning(W_POOL_SETTING))
  118. threaded = not w.use_eventloop
  119. procs = w.min_concurrency
  120. forking_enable = w.no_execv if w.force_execv else True
  121. if not threaded:
  122. semaphore = w.semaphore = LaxBoundedSemaphore(procs)
  123. w._quick_acquire = w.semaphore.acquire
  124. w._quick_release = w.semaphore.release
  125. max_restarts = 100
  126. allow_restart = self.autoreload_enabled or w.pool_restarts
  127. pool = w.pool = self.instantiate(
  128. w.pool_cls, w.min_concurrency,
  129. initargs=(w.app, w.hostname),
  130. maxtasksperchild=w.max_tasks_per_child,
  131. timeout=w.task_time_limit,
  132. soft_timeout=w.task_soft_time_limit,
  133. putlocks=w.pool_putlocks and threaded,
  134. lost_worker_timeout=w.worker_lost_wait,
  135. threads=threaded,
  136. max_restarts=max_restarts,
  137. allow_restart=allow_restart,
  138. forking_enable=forking_enable,
  139. semaphore=semaphore,
  140. sched_strategy=self.optimization,
  141. )
  142. return pool
  143. def info(self, w):
  144. return {'pool': w.pool.info}
  145. def register_with_event_loop(self, w, hub):
  146. w.pool.register_with_event_loop(hub)
  147. class Beat(bootsteps.StartStopStep):
  148. """Step used to embed a beat process.
  149. This will only be enabled if the ``beat``
  150. argument is set.
  151. """
  152. label = 'Beat'
  153. conditional = True
  154. def __init__(self, w, beat=False, **kwargs):
  155. self.enabled = w.beat = beat
  156. w.beat = None
  157. def create(self, w):
  158. from celery.beat import EmbeddedService
  159. if w.pool_cls.__module__.endswith(('gevent', 'eventlet')):
  160. raise ImproperlyConfigured(ERR_B_GREEN)
  161. b = w.beat = EmbeddedService(app=w.app,
  162. schedule_filename=w.schedule_filename,
  163. scheduler_cls=w.scheduler_cls)
  164. return b
  165. class StateDB(bootsteps.Step):
  166. """This bootstep sets up the workers state db if enabled."""
  167. def __init__(self, w, **kwargs):
  168. self.enabled = w.state_db
  169. w._persistence = None
  170. def create(self, w):
  171. w._persistence = w.state.Persistent(w.state, w.state_db, w.app.clock)
  172. atexit.register(w._persistence.save)
  173. class Consumer(bootsteps.StartStopStep):
  174. last = True
  175. def create(self, w):
  176. if w.max_concurrency:
  177. prefetch_count = max(w.min_concurrency, 1) * w.prefetch_multiplier
  178. else:
  179. prefetch_count = w.concurrency * w.prefetch_multiplier
  180. c = w.consumer = self.instantiate(
  181. w.consumer_cls, w.process_task,
  182. hostname=w.hostname,
  183. send_events=w.send_events,
  184. init_callback=w.ready_callback,
  185. initial_prefetch_count=prefetch_count,
  186. pool=w.pool,
  187. timer=w.timer,
  188. app=w.app,
  189. controller=w,
  190. hub=w.hub,
  191. worker_options=w.options,
  192. disable_rate_limits=w.disable_rate_limits,
  193. prefetch_multiplier=w.prefetch_multiplier,
  194. )
  195. return c