worker.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.apps.worker
  4. ~~~~~~~~~~~~~~~~~~
  5. This module is the 'program-version' of :mod:`celery.worker`.
  6. It does everything necessary to run that module
  7. as an actual application, like installing signal handlers,
  8. platform tweaks, and so on.
  9. """
  10. from __future__ import absolute_import, print_function
  11. import logging
  12. import os
  13. import platform as _platform
  14. import sys
  15. import warnings
  16. from functools import partial
  17. from billiard import current_process
  18. from kombu.utils.encoding import safe_str
  19. from celery import VERSION_BANNER, platforms, signals
  20. from celery.exceptions import CDeprecationWarning, SystemTerminate
  21. from celery.five import string, string_t
  22. from celery.loaders.app import AppLoader
  23. from celery.app import trace
  24. from celery.utils import cry, isatty
  25. from celery.utils.imports import qualname
  26. from celery.utils.log import get_logger, in_sighandler, set_in_sighandler
  27. from celery.utils.text import pluralize
  28. from celery.worker import WorkController
  29. __all__ = ['Worker']
  30. logger = get_logger(__name__)
  31. is_jython = sys.platform.startswith('java')
  32. is_pypy = hasattr(sys, 'pypy_version_info')
  33. C_FORCE_ROOT = os.environ.get('C_FORCE_ROOT', False)
  34. ROOT_DISALLOWED = """\
  35. Running a worker with superuser privileges when the
  36. worker accepts messages serialized with pickle is a very bad idea!
  37. If you really want to continue then you have to set the C_FORCE_ROOT
  38. environment variable (but please think about this before you do).
  39. """
  40. ROOT_DISCOURAGED = """\
  41. You are running the worker with superuser privileges, which is
  42. absolutely not recommended!
  43. Please specify a different user using the -u option.
  44. """
  45. W_PICKLE_DEPRECATED = """
  46. Starting from version 3.2 Celery will refuse to accept pickle by default.
  47. The pickle serializer is a security concern as it may give attackers
  48. the ability to execute any command. It's important to secure
  49. your broker from unauthorized access when using pickle, so we think
  50. that enabling pickle should require a deliberate action and not be
  51. the default choice.
  52. If you depend on pickle then you should set a setting to disable this
  53. warning and to be sure that everything will continue working
  54. when you upgrade to Celery 3.2::
  55. CELERY_ACCEPT_CONTENT = ['pickle', 'json', 'msgpack', 'yaml']
  56. You must only enable the serializers that you will actually use.
  57. """
  58. def active_thread_count():
  59. from threading import enumerate
  60. return sum(1 for t in enumerate()
  61. if not t.name.startswith('Dummy-'))
  62. def safe_say(msg):
  63. print('\n{0}'.format(msg), file=sys.__stderr__)
  64. ARTLINES = [
  65. ' --------------',
  66. '---- **** -----',
  67. '--- * *** * --',
  68. '-- * - **** ---',
  69. '- ** ----------',
  70. '- ** ----------',
  71. '- ** ----------',
  72. '- ** ----------',
  73. '- *** --- * ---',
  74. '-- ******* ----',
  75. '--- ***** -----',
  76. ' --------------',
  77. ]
  78. BANNER = """\
  79. {hostname} v{version}
  80. {platform}
  81. [config]
  82. .> broker: {conninfo}
  83. .> app: {app}
  84. .> concurrency: {concurrency}
  85. .> events: {events}
  86. [queues]
  87. {queues}
  88. """
  89. EXTRA_INFO_FMT = """
  90. [tasks]
  91. {tasks}
  92. """
  93. class Worker(WorkController):
  94. def on_before_init(self, **kwargs):
  95. trace.setup_worker_optimizations(self.app)
  96. # this signal can be used to set up configuration for
  97. # workers by name.
  98. signals.celeryd_init.send(
  99. sender=self.hostname, instance=self,
  100. conf=self.app.conf, options=kwargs,
  101. )
  102. def on_after_init(self, purge=False, no_color=None,
  103. redirect_stdouts=None, redirect_stdouts_level=None,
  104. **kwargs):
  105. self.redirect_stdouts = self._getopt(
  106. 'redirect_stdouts', redirect_stdouts,
  107. )
  108. self.redirect_stdouts_level = self._getopt(
  109. 'redirect_stdouts_level', redirect_stdouts_level,
  110. )
  111. super(Worker, self).setup_defaults(**kwargs)
  112. self.purge = purge
  113. self.no_color = no_color
  114. self._isatty = isatty(sys.stdout)
  115. self.colored = self.app.log.colored(
  116. self.logfile,
  117. enabled=not no_color if no_color is not None else no_color
  118. )
  119. def on_init_blueprint(self):
  120. self._custom_logging = self.setup_logging()
  121. # apply task execution optimizations
  122. # -- This will finalize the app!
  123. trace.setup_worker_optimizations(self.app)
  124. def on_start(self):
  125. if not self._custom_logging and self.redirect_stdouts:
  126. self.app.log.redirect_stdouts(self.redirect_stdouts_level)
  127. WorkController.on_start(self)
  128. # this signal can be used to e.g. change queues after
  129. # the -Q option has been applied.
  130. signals.celeryd_after_setup.send(
  131. sender=self.hostname, instance=self, conf=self.app.conf,
  132. )
  133. if getattr(os, 'getuid', None) and os.getuid() == 0:
  134. accept_encoding = self.app.conf.CELERY_ACCEPT_CONTENT
  135. if ('pickle' in accept_encoding or
  136. 'application/x-python-serialize' in accept_encoding):
  137. if not C_FORCE_ROOT:
  138. raise RuntimeError(ROOT_DISALLOWED)
  139. warnings.warn(RuntimeWarning(ROOT_DISCOURAGED))
  140. if not self.app.conf.value_set_for('CELERY_ACCEPT_CONTENT'):
  141. warnings.warn(CDeprecationWarning(W_PICKLE_DEPRECATED))
  142. if self.purge:
  143. self.purge_messages()
  144. # Dump configuration to screen so we have some basic information
  145. # for when users sends bug reports.
  146. sys.__stdout__.write(
  147. str(self.colored.cyan(' \n', self.startup_info())) +
  148. str(self.colored.reset(self.extra_info() or '')) + '\n'
  149. )
  150. self.set_process_status('-active-')
  151. self.install_platform_tweaks(self)
  152. def on_consumer_ready(self, consumer):
  153. signals.worker_ready.send(sender=consumer)
  154. print('{0} ready.'.format(safe_str(self.hostname), ))
  155. def setup_logging(self, colorize=None):
  156. if colorize is None and self.no_color is not None:
  157. colorize = not self.no_color
  158. return self.app.log.setup(
  159. self.loglevel, self.logfile,
  160. redirect_stdouts=False, colorize=colorize,
  161. )
  162. def purge_messages(self):
  163. count = self.app.control.purge()
  164. if count:
  165. print('purge: Erased {0} {1} from the queue.\n'.format(
  166. count, pluralize(count, 'message')))
  167. def tasklist(self, include_builtins=True, sep='\n', int_='celery.'):
  168. return sep.join(
  169. ' . {0}'.format(task) for task in sorted(self.app.tasks)
  170. if (not task.startswith(int_) if not include_builtins else task)
  171. )
  172. def extra_info(self):
  173. if self.loglevel <= logging.INFO:
  174. include_builtins = self.loglevel <= logging.DEBUG
  175. tasklist = self.tasklist(include_builtins=include_builtins)
  176. return EXTRA_INFO_FMT.format(tasks=tasklist)
  177. def startup_info(self):
  178. app = self.app
  179. concurrency = string(self.concurrency)
  180. appr = '{0}:0x{1:x}'.format(app.main or '__main__', id(app))
  181. if not isinstance(app.loader, AppLoader):
  182. loader = qualname(app.loader)
  183. if loader.startswith('celery.loaders'):
  184. loader = loader[14:]
  185. appr += ' ({0})'.format(loader)
  186. if self.autoscale:
  187. max, min = self.autoscale
  188. concurrency = '{{min={0}, max={1}}}'.format(min, max)
  189. pool = self.pool_cls
  190. if not isinstance(pool, string_t):
  191. pool = pool.__module__
  192. concurrency += ' ({0})'.format(pool.split('.')[-1])
  193. events = 'ON'
  194. if not self.send_events:
  195. events = 'OFF (enable -E to monitor this worker)'
  196. banner = BANNER.format(
  197. app=appr,
  198. hostname=safe_str(self.hostname),
  199. version=VERSION_BANNER,
  200. conninfo=self.app.connection().as_uri(),
  201. concurrency=concurrency,
  202. platform=safe_str(_platform.platform()),
  203. events=events,
  204. queues=app.amqp.queues.format(indent=0, indent_first=False),
  205. ).splitlines()
  206. # integrate the ASCII art.
  207. for i, x in enumerate(banner):
  208. try:
  209. banner[i] = ' '.join([ARTLINES[i], banner[i]])
  210. except IndexError:
  211. banner[i] = ' ' * 16 + banner[i]
  212. return '\n'.join(banner) + '\n'
  213. def install_platform_tweaks(self, worker):
  214. """Install platform specific tweaks and workarounds."""
  215. if self.app.IS_OSX:
  216. self.osx_proxy_detection_workaround()
  217. # Install signal handler so SIGHUP restarts the worker.
  218. if not self._isatty:
  219. # only install HUP handler if detached from terminal,
  220. # so closing the terminal window doesn't restart the worker
  221. # into the background.
  222. if self.app.IS_OSX:
  223. # OS X can't exec from a process using threads.
  224. # See http://github.com/celery/celery/issues#issue/152
  225. install_HUP_not_supported_handler(worker)
  226. else:
  227. install_worker_restart_handler(worker)
  228. install_worker_term_handler(worker)
  229. install_worker_term_hard_handler(worker)
  230. install_worker_int_handler(worker)
  231. install_cry_handler()
  232. install_rdb_handler()
  233. def osx_proxy_detection_workaround(self):
  234. """See http://github.com/celery/celery/issues#issue/161"""
  235. os.environ.setdefault('celery_dummy_proxy', 'set_by_celeryd')
  236. def set_process_status(self, info):
  237. return platforms.set_mp_process_title(
  238. 'celeryd',
  239. info='{0} ({1})'.format(info, platforms.strargv(sys.argv)),
  240. hostname=self.hostname,
  241. )
  242. def _shutdown_handler(worker, sig='TERM', how='Warm',
  243. exc=SystemExit, callback=None):
  244. def _handle_request(*args):
  245. with in_sighandler():
  246. from celery.worker import state
  247. if current_process()._name == 'MainProcess':
  248. if callback:
  249. callback(worker)
  250. safe_say('worker: {0} shutdown (MainProcess)'.format(how))
  251. if active_thread_count() > 1:
  252. setattr(state, {'Warm': 'should_stop',
  253. 'Cold': 'should_terminate'}[how], True)
  254. else:
  255. raise exc()
  256. _handle_request.__name__ = 'worker_' + how
  257. platforms.signals[sig] = _handle_request
  258. install_worker_term_handler = partial(
  259. _shutdown_handler, sig='SIGTERM', how='Warm', exc=SystemExit,
  260. )
  261. if not is_jython: # pragma: no cover
  262. install_worker_term_hard_handler = partial(
  263. _shutdown_handler, sig='SIGQUIT', how='Cold', exc=SystemTerminate,
  264. )
  265. else: # pragma: no cover
  266. install_worker_term_handler = \
  267. install_worker_term_hard_handler = lambda *a, **kw: None
  268. def on_SIGINT(worker):
  269. safe_say('worker: Hitting Ctrl+C again will terminate all running tasks!')
  270. install_worker_term_hard_handler(worker, sig='SIGINT')
  271. if not is_jython: # pragma: no cover
  272. install_worker_int_handler = partial(
  273. _shutdown_handler, sig='SIGINT', callback=on_SIGINT
  274. )
  275. else: # pragma: no cover
  276. install_worker_int_handler = lambda *a, **kw: None
  277. def _reload_current_worker():
  278. os.execv(sys.executable, [sys.executable] + sys.argv)
  279. def install_worker_restart_handler(worker, sig='SIGHUP'):
  280. def restart_worker_sig_handler(*args):
  281. """Signal handler restarting the current python program."""
  282. set_in_sighandler(True)
  283. safe_say('Restarting celery worker ({0})'.format(' '.join(sys.argv)))
  284. import atexit
  285. atexit.register(_reload_current_worker)
  286. from celery.worker import state
  287. state.should_stop = True
  288. platforms.signals[sig] = restart_worker_sig_handler
  289. def install_cry_handler(sig='SIGUSR1'):
  290. # Jython/PyPy does not have sys._current_frames
  291. if is_jython or is_pypy: # pragma: no cover
  292. return
  293. def cry_handler(*args):
  294. """Signal handler logging the stacktrace of all active threads."""
  295. with in_sighandler():
  296. safe_say(cry())
  297. platforms.signals[sig] = cry_handler
  298. def install_rdb_handler(envvar='CELERY_RDBSIG',
  299. sig='SIGUSR2'): # pragma: no cover
  300. def rdb_handler(*args):
  301. """Signal handler setting a rdb breakpoint at the current frame."""
  302. with in_sighandler():
  303. from celery.contrib.rdb import set_trace, _frame
  304. # gevent does not pass standard signal handler args
  305. frame = args[1] if args else _frame().f_back
  306. set_trace(frame)
  307. if os.environ.get(envvar):
  308. platforms.signals[sig] = rdb_handler
  309. def install_HUP_not_supported_handler(worker, sig='SIGHUP'):
  310. def warn_on_HUP_handler(signum, frame):
  311. with in_sighandler():
  312. safe_say('{sig} not supported: Restarting with {sig} is '
  313. 'unstable on this platform!'.format(sig=sig))
  314. platforms.signals[sig] = warn_on_HUP_handler