__init__.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker
  4. ~~~~~~~~~~~~~
  5. :class:`WorkController` can be used to instantiate in-process workers.
  6. The worker consists of several components, all managed by bootsteps
  7. (mod:`celery.bootsteps`).
  8. """
  9. from __future__ import absolute_import
  10. import os
  11. import socket
  12. import sys
  13. import traceback
  14. try:
  15. import resource
  16. except ImportError: # pragma: no cover
  17. resource = None # noqa
  18. from billiard import cpu_count
  19. from billiard.util import Finalize
  20. from kombu.syn import detect_environment
  21. from celery import bootsteps
  22. from celery.bootsteps import RUN, TERMINATE
  23. from celery import concurrency as _concurrency
  24. from celery import platforms
  25. from celery import signals
  26. from celery.exceptions import (
  27. ImproperlyConfigured, SystemTerminate, TaskRevokedError,
  28. )
  29. from celery.five import string_t, values
  30. from celery.utils import nodename, nodesplit, worker_direct
  31. from celery.utils.imports import reload_from_cwd
  32. from celery.utils.log import mlevel, worker_logger as logger
  33. from . import state
  34. __all__ = ['WorkController', 'default_nodename']
  35. SELECT_UNKNOWN_QUEUE = """\
  36. Trying to select queue subset of {0!r}, but queue {1} is not
  37. defined in the CELERY_QUEUES setting.
  38. If you want to automatically declare unknown queues you can
  39. enable the CELERY_CREATE_MISSING_QUEUES setting.
  40. """
  41. DESELECT_UNKNOWN_QUEUE = """\
  42. Trying to deselect queue subset of {0!r}, but queue {1} is not
  43. defined in the CELERY_QUEUES setting.
  44. """
  45. def str_to_list(s):
  46. if isinstance(s, string_t):
  47. return s.split(',')
  48. return s
  49. def default_nodename(hostname):
  50. name, host = nodesplit(hostname or '')
  51. return nodename(name or 'celery', host or socket.gethostname())
  52. class WorkController(object):
  53. """Unmanaged worker instance."""
  54. app = None
  55. pidlock = None
  56. blueprint = None
  57. pool = None
  58. semaphore = None
  59. class Blueprint(bootsteps.Blueprint):
  60. """Worker bootstep blueprint."""
  61. name = 'Worker'
  62. default_steps = set([
  63. 'celery.worker.components:Hub',
  64. 'celery.worker.components:Queues',
  65. 'celery.worker.components:Pool',
  66. 'celery.worker.components:Beat',
  67. 'celery.worker.components:Timer',
  68. 'celery.worker.components:StateDB',
  69. 'celery.worker.components:Consumer',
  70. 'celery.worker.autoscale:WorkerComponent',
  71. 'celery.worker.autoreload:WorkerComponent',
  72. ])
  73. def __init__(self, app=None, hostname=None, **kwargs):
  74. self.app = app or self.app
  75. self.hostname = default_nodename(hostname)
  76. self.app.loader.init_worker()
  77. self.on_before_init(**kwargs)
  78. self.setup_defaults(**kwargs)
  79. self.on_after_init(**kwargs)
  80. self.setup_instance(**self.prepare_args(**kwargs))
  81. self._finalize = [
  82. Finalize(self, self._send_worker_shutdown, exitpriority=10),
  83. ]
  84. def setup_instance(self, queues=None, ready_callback=None, pidfile=None,
  85. include=None, use_eventloop=None, exclude_queues=None,
  86. **kwargs):
  87. self.pidfile = pidfile
  88. self.setup_queues(queues, exclude_queues)
  89. self.setup_includes(str_to_list(include))
  90. # Set default concurrency
  91. if not self.concurrency:
  92. try:
  93. self.concurrency = cpu_count()
  94. except NotImplementedError:
  95. self.concurrency = 2
  96. # Options
  97. self.loglevel = mlevel(self.loglevel)
  98. self.ready_callback = ready_callback or self.on_consumer_ready
  99. # this connection is not established, only used for params
  100. self._conninfo = self.app.connection()
  101. self.use_eventloop = (
  102. self.should_use_eventloop() if use_eventloop is None
  103. else use_eventloop
  104. )
  105. self.options = kwargs
  106. signals.worker_init.send(sender=self)
  107. # Initialize bootsteps
  108. self.pool_cls = _concurrency.get_implementation(self.pool_cls)
  109. self.steps = []
  110. self.on_init_blueprint()
  111. self.blueprint = self.Blueprint(app=self.app,
  112. on_start=self.on_start,
  113. on_close=self.on_close,
  114. on_stopped=self.on_stopped)
  115. self.blueprint.apply(self, **kwargs)
  116. def on_init_blueprint(self):
  117. pass
  118. def on_before_init(self, **kwargs):
  119. pass
  120. def on_after_init(self, **kwargs):
  121. pass
  122. def on_start(self):
  123. if self.pidfile:
  124. self.pidlock = platforms.create_pidlock(self.pidfile)
  125. def on_consumer_ready(self, consumer):
  126. pass
  127. def on_close(self):
  128. self.app.loader.shutdown_worker()
  129. def on_stopped(self):
  130. self.timer.stop()
  131. self.consumer.shutdown()
  132. if self.pidlock:
  133. self.pidlock.release()
  134. def setup_queues(self, include, exclude=None):
  135. include = str_to_list(include)
  136. exclude = str_to_list(exclude)
  137. try:
  138. self.app.amqp.queues.select(include)
  139. except KeyError as exc:
  140. raise ImproperlyConfigured(
  141. SELECT_UNKNOWN_QUEUE.format(include, exc))
  142. try:
  143. self.app.amqp.queues.deselect(exclude)
  144. except KeyError as exc:
  145. raise ImproperlyConfigured(
  146. DESELECT_UNKNOWN_QUEUE.format(exclude, exc))
  147. if self.app.conf.CELERY_WORKER_DIRECT:
  148. self.app.amqp.queues.select_add(worker_direct(self.hostname))
  149. def setup_includes(self, includes):
  150. # Update celery_include to have all known task modules, so that we
  151. # ensure all task modules are imported in case an execv happens.
  152. prev = tuple(self.app.conf.CELERY_INCLUDE)
  153. if includes:
  154. prev += tuple(includes)
  155. [self.app.loader.import_task_module(m) for m in includes]
  156. self.include = includes
  157. task_modules = set(task.__class__.__module__
  158. for task in values(self.app.tasks))
  159. self.app.conf.CELERY_INCLUDE = tuple(set(prev) | task_modules)
  160. def prepare_args(self, **kwargs):
  161. return kwargs
  162. def _send_worker_shutdown(self):
  163. signals.worker_shutdown.send(sender=self)
  164. def start(self):
  165. """Starts the workers main loop."""
  166. try:
  167. self.blueprint.start(self)
  168. except SystemTerminate:
  169. self.terminate()
  170. except Exception as exc:
  171. logger.error('Unrecoverable error: %r', exc, exc_info=True)
  172. self.stop()
  173. except (KeyboardInterrupt, SystemExit):
  174. self.stop()
  175. def register_with_event_loop(self, hub):
  176. self.blueprint.send_all(self, 'register_with_event_loop', args=(hub, ))
  177. def _process_task_sem(self, req):
  178. return self._quick_acquire(self._process_task, req)
  179. def _process_task(self, req):
  180. """Process task by sending it to the pool of workers."""
  181. try:
  182. req.execute_using_pool(self.pool)
  183. except TaskRevokedError:
  184. try:
  185. self._quick_release() # Issue 877
  186. except AttributeError:
  187. pass
  188. except Exception as exc:
  189. logger.critical('Internal error: %r\n%s',
  190. exc, traceback.format_exc(), exc_info=True)
  191. def signal_consumer_close(self):
  192. try:
  193. self.consumer.close()
  194. except AttributeError:
  195. pass
  196. def should_use_eventloop(self):
  197. return (detect_environment() == 'default' and
  198. self._conninfo.is_evented and not self.app.IS_WINDOWS)
  199. def stop(self, in_sighandler=False):
  200. """Graceful shutdown of the worker server."""
  201. if self.blueprint.state == RUN:
  202. self.signal_consumer_close()
  203. if not in_sighandler or self.pool.signal_safe:
  204. self._shutdown(warm=True)
  205. def terminate(self, in_sighandler=False):
  206. """Not so graceful shutdown of the worker server."""
  207. if self.blueprint.state != TERMINATE:
  208. self.signal_consumer_close()
  209. if not in_sighandler or self.pool.signal_safe:
  210. self._shutdown(warm=False)
  211. def _shutdown(self, warm=True):
  212. # if blueprint does not exist it means that we had an
  213. # error before the bootsteps could be initialized.
  214. if self.blueprint is not None:
  215. self.blueprint.stop(self, terminate=not warm)
  216. self.blueprint.join()
  217. def reload(self, modules=None, reload=False, reloader=None):
  218. modules = self.app.loader.task_modules if modules is None else modules
  219. imp = self.app.loader.import_from_cwd
  220. for module in set(modules or ()):
  221. if module not in sys.modules:
  222. logger.debug('importing module %s', module)
  223. imp(module)
  224. elif reload:
  225. logger.debug('reloading module %s', module)
  226. reload_from_cwd(sys.modules[module], reloader)
  227. self.pool.restart()
  228. def info(self):
  229. return {'total': self.state.total_count,
  230. 'pid': os.getpid(),
  231. 'clock': str(self.app.clock)}
  232. def rusage(self):
  233. if resource is None:
  234. raise NotImplementedError('rusage not supported by this platform')
  235. s = resource.getrusage(resource.RUSAGE_SELF)
  236. return {
  237. 'utime': s.ru_utime,
  238. 'stime': s.ru_stime,
  239. 'maxrss': s.ru_maxrss,
  240. 'ixrss': s.ru_ixrss,
  241. 'idrss': s.ru_idrss,
  242. 'isrss': s.ru_isrss,
  243. 'minflt': s.ru_minflt,
  244. 'majflt': s.ru_majflt,
  245. 'nswap': s.ru_nswap,
  246. 'inblock': s.ru_inblock,
  247. 'oublock': s.ru_oublock,
  248. 'msgsnd': s.ru_msgsnd,
  249. 'msgrcv': s.ru_msgrcv,
  250. 'nsignals': s.ru_nsignals,
  251. 'nvcsw': s.ru_nvcsw,
  252. 'nivcsw': s.ru_nivcsw,
  253. }
  254. def stats(self):
  255. info = self.info()
  256. info.update(self.blueprint.info(self))
  257. info.update(self.consumer.blueprint.info(self.consumer))
  258. try:
  259. info['rusage'] = self.rusage()
  260. except NotImplementedError:
  261. info['rusage'] = 'N/A'
  262. return info
  263. def __repr__(self):
  264. return '<Worker: {self.hostname} ({state})>'.format(
  265. self=self, state=self.blueprint.human_state(),
  266. )
  267. def __str__(self):
  268. return self.hostname
  269. @property
  270. def state(self):
  271. return state
  272. def setup_defaults(self, concurrency=None, loglevel=None, logfile=None,
  273. send_events=None, pool_cls=None, consumer_cls=None,
  274. timer_cls=None, timer_precision=None,
  275. autoscaler_cls=None, autoreloader_cls=None,
  276. pool_putlocks=None, pool_restarts=None,
  277. force_execv=None, state_db=None,
  278. schedule_filename=None, scheduler_cls=None,
  279. task_time_limit=None, task_soft_time_limit=None,
  280. max_tasks_per_child=None, prefetch_multiplier=None,
  281. disable_rate_limits=None, worker_lost_wait=None, **_kw):
  282. self.concurrency = self._getopt('concurrency', concurrency)
  283. self.loglevel = self._getopt('log_level', loglevel)
  284. self.logfile = self._getopt('log_file', logfile)
  285. self.send_events = self._getopt('send_events', send_events)
  286. self.pool_cls = self._getopt('pool', pool_cls)
  287. self.consumer_cls = self._getopt('consumer', consumer_cls)
  288. self.timer_cls = self._getopt('timer', timer_cls)
  289. self.timer_precision = self._getopt('timer_precision', timer_precision)
  290. self.autoscaler_cls = self._getopt('autoscaler', autoscaler_cls)
  291. self.autoreloader_cls = self._getopt('autoreloader', autoreloader_cls)
  292. self.pool_putlocks = self._getopt('pool_putlocks', pool_putlocks)
  293. self.pool_restarts = self._getopt('pool_restarts', pool_restarts)
  294. self.force_execv = self._getopt('force_execv', force_execv)
  295. self.state_db = self._getopt('state_db', state_db)
  296. self.schedule_filename = self._getopt(
  297. 'schedule_filename', schedule_filename,
  298. )
  299. self.scheduler_cls = self._getopt(
  300. 'celerybeat_scheduler', scheduler_cls,
  301. )
  302. self.task_time_limit = self._getopt(
  303. 'task_time_limit', task_time_limit,
  304. )
  305. self.task_soft_time_limit = self._getopt(
  306. 'task_soft_time_limit', task_soft_time_limit,
  307. )
  308. self.max_tasks_per_child = self._getopt(
  309. 'max_tasks_per_child', max_tasks_per_child,
  310. )
  311. self.prefetch_multiplier = int(self._getopt(
  312. 'prefetch_multiplier', prefetch_multiplier,
  313. ))
  314. self.disable_rate_limits = self._getopt(
  315. 'disable_rate_limits', disable_rate_limits,
  316. )
  317. self.worker_lost_wait = self._getopt(
  318. 'worker_lost_wait', worker_lost_wait,
  319. )
  320. def _getopt(self, key, value):
  321. if value is not None:
  322. return value
  323. return self.app.conf.find_value_for_key(key, namespace='celeryd')