worker.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258
  1. # -*- coding: utf-8 -*-
  2. """
  3. The :program:`celery worker` command (previously known as ``celeryd``)
  4. .. program:: celery worker
  5. .. seealso::
  6. See :ref:`preload-options`.
  7. .. cmdoption:: -c, --concurrency
  8. Number of child processes processing the queue. The default
  9. is the number of CPUs available on your system.
  10. .. cmdoption:: -P, --pool
  11. Pool implementation:
  12. prefork (default), eventlet, gevent, solo or threads.
  13. .. cmdoption:: -f, --logfile
  14. Path to log file. If no logfile is specified, `stderr` is used.
  15. .. cmdoption:: -l, --loglevel
  16. Logging level, choose between `DEBUG`, `INFO`, `WARNING`,
  17. `ERROR`, `CRITICAL`, or `FATAL`.
  18. .. cmdoption:: -n, --hostname
  19. Set custom hostname, e.g. 'w1.%h'. Expands: %h (hostname),
  20. %n (name) and %d, (domain).
  21. .. cmdoption:: -B, --beat
  22. Also run the `celery beat` periodic task scheduler. Please note that
  23. there must only be one instance of this service.
  24. .. cmdoption:: -Q, --queues
  25. List of queues to enable for this worker, separated by comma.
  26. By default all configured queues are enabled.
  27. Example: `-Q video,image`
  28. .. cmdoption:: -I, --include
  29. Comma separated list of additional modules to import.
  30. Example: -I foo.tasks,bar.tasks
  31. .. cmdoption:: -s, --schedule
  32. Path to the schedule database if running with the `-B` option.
  33. Defaults to `celerybeat-schedule`. The extension ".db" may be
  34. appended to the filename.
  35. .. cmdoption:: -O
  36. Apply optimization profile. Supported: default, fair
  37. .. cmdoption:: --scheduler
  38. Scheduler class to use. Default is celery.beat.PersistentScheduler
  39. .. cmdoption:: -S, --statedb
  40. Path to the state database. The extension '.db' may
  41. be appended to the filename. Default: {default}
  42. .. cmdoption:: -E, --events
  43. Send events that can be captured by monitors like :program:`celery events`,
  44. `celerymon`, and others.
  45. .. cmdoption:: --without-gossip
  46. Do not subscribe to other workers events.
  47. .. cmdoption:: --without-mingle
  48. Do not synchronize with other workers at startup.
  49. .. cmdoption:: --without-heartbeat
  50. Do not send event heartbeats.
  51. .. cmdoption:: --purge
  52. Purges all waiting tasks before the daemon is started.
  53. **WARNING**: This is unrecoverable, and the tasks will be
  54. deleted from the messaging server.
  55. .. cmdoption:: --time-limit
  56. Enables a hard time limit (in seconds int/float) for tasks.
  57. .. cmdoption:: --soft-time-limit
  58. Enables a soft time limit (in seconds int/float) for tasks.
  59. .. cmdoption:: --maxtasksperchild
  60. Maximum number of tasks a pool worker can execute before it's
  61. terminated and replaced by a new worker.
  62. .. cmdoption:: --pidfile
  63. Optional file used to store the workers pid.
  64. The worker will not start if this file already exists
  65. and the pid is still alive.
  66. .. cmdoption:: --autoscale
  67. Enable autoscaling by providing
  68. max_concurrency, min_concurrency. Example::
  69. --autoscale=10,3
  70. (always keep 3 processes, but grow to 10 if necessary)
  71. .. cmdoption:: --autoreload
  72. Enable autoreloading.
  73. .. cmdoption:: --no-execv
  74. Don't do execv after multiprocessing child fork.
  75. """
  76. from __future__ import absolute_import
  77. import sys
  78. from celery import concurrency
  79. from celery.bin.base import Command, Option, daemon_options
  80. from celery.bin.celeryd_detach import detached_celeryd
  81. from celery.five import string_t
  82. from celery.utils.log import LOG_LEVELS, mlevel
  83. __all__ = ['worker', 'main']
  84. __MODULE_DOC__ = __doc__
  85. class worker(Command):
  86. """Start worker instance.
  87. Examples::
  88. celery worker --app=proj -l info
  89. celery worker -A proj -l info -Q hipri,lopri
  90. celery worker -A proj --concurrency=4
  91. celery worker -A proj --concurrency=1000 -P eventlet
  92. celery worker --autoscale=10,0
  93. """
  94. doc = __MODULE_DOC__ # parse help from this too
  95. namespace = 'celeryd'
  96. enable_config_from_cmdline = True
  97. supports_args = False
  98. def run_from_argv(self, prog_name, argv=None, command=None):
  99. command = sys.argv[0] if command is None else command
  100. argv = sys.argv[1:] if argv is None else argv
  101. # parse options before detaching so errors can be handled.
  102. options, args = self.prepare_args(
  103. *self.parse_options(prog_name, argv, command))
  104. self.maybe_detach([command] + argv)
  105. return self(*args, **options)
  106. def maybe_detach(self, argv, dopts=['-D', '--detach']):
  107. if any(arg in argv for arg in dopts):
  108. argv = [v for v in argv if v not in dopts]
  109. # will never return
  110. detached_celeryd(self.app).execute_from_commandline(argv)
  111. raise SystemExit(0)
  112. def run(self, hostname=None, pool_cls=None, loglevel=None,
  113. app=None, **kwargs):
  114. # Pools like eventlet/gevent needs to patch libs as early
  115. # as possible.
  116. pool_cls = (concurrency.get_implementation(pool_cls) or
  117. self.app.conf.CELERYD_POOL)
  118. if self.app.IS_WINDOWS and kwargs.get('beat'):
  119. self.die('-B option does not work on Windows. '
  120. 'Please run celery beat as a separate service.')
  121. hostname = self.simple_format(hostname)
  122. if loglevel:
  123. try:
  124. loglevel = mlevel(loglevel)
  125. except KeyError: # pragma: no cover
  126. self.die('Unknown level {0!r}. Please use one of {1}.'.format(
  127. loglevel, '|'.join(
  128. l for l in LOG_LEVELS if isinstance(l, string_t))))
  129. return self.app.Worker(
  130. hostname=hostname, pool_cls=pool_cls, loglevel=loglevel, **kwargs
  131. ).start()
  132. def with_pool_option(self, argv):
  133. # this command support custom pools
  134. # that may have to be loaded as early as possible.
  135. return (['-P'], ['--pool'])
  136. def get_options(self):
  137. conf = self.app.conf
  138. return (
  139. Option('-c', '--concurrency',
  140. default=conf.CELERYD_CONCURRENCY, type='int'),
  141. Option('-P', '--pool', default=conf.CELERYD_POOL, dest='pool_cls'),
  142. Option('--purge', '--discard', default=False, action='store_true'),
  143. Option('-l', '--loglevel', default=conf.CELERYD_LOG_LEVEL),
  144. Option('-n', '--hostname'),
  145. Option('-B', '--beat', action='store_true'),
  146. Option('-s', '--schedule', dest='schedule_filename',
  147. default=conf.CELERYBEAT_SCHEDULE_FILENAME),
  148. Option('--scheduler', dest='scheduler_cls'),
  149. Option('-S', '--statedb',
  150. default=conf.CELERYD_STATE_DB, dest='state_db'),
  151. Option('-E', '--events', default=conf.CELERY_SEND_EVENTS,
  152. action='store_true', dest='send_events'),
  153. Option('--time-limit', type='float', dest='task_time_limit',
  154. default=conf.CELERYD_TASK_TIME_LIMIT),
  155. Option('--soft-time-limit', dest='task_soft_time_limit',
  156. default=conf.CELERYD_TASK_SOFT_TIME_LIMIT, type='float'),
  157. Option('--maxtasksperchild', dest='max_tasks_per_child',
  158. default=conf.CELERYD_MAX_TASKS_PER_CHILD, type='int'),
  159. Option('--queues', '-Q', default=[]),
  160. Option('--exclude-queues', '-X', default=[]),
  161. Option('--include', '-I', default=[]),
  162. Option('--autoscale'),
  163. Option('--autoreload', action='store_true'),
  164. Option('--no-execv', action='store_true', default=False),
  165. Option('--without-gossip', action='store_true', default=False),
  166. Option('--without-mingle', action='store_true', default=False),
  167. Option('--without-heartbeat', action='store_true', default=False),
  168. Option('-O', dest='optimization'),
  169. Option('-D', '--detach', action='store_true'),
  170. ) + daemon_options() + tuple(self.app.user_options['worker'])
  171. def main(app=None):
  172. # Fix for setuptools generated scripts, so that it will
  173. # work with multiprocessing fork emulation.
  174. # (see multiprocessing.forking.get_preparation_data())
  175. if __name__ != '__main__': # pragma: no cover
  176. sys.modules['__main__'] = sys.modules[__name__]
  177. from billiard import freeze_support
  178. freeze_support()
  179. worker(app=app).execute_from_commandline()
  180. if __name__ == '__main__': # pragma: no cover
  181. main()