prefork.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.prefork
  4. ~~~~~~~~~~~~~~~~~~~~~~~~~~
  5. Pool implementation using :mod:`multiprocessing`.
  6. """
  7. from __future__ import absolute_import
  8. import os
  9. from billiard import forking_enable
  10. from billiard.pool import RUN, CLOSE, Pool as BlockingPool
  11. from celery import platforms
  12. from celery import signals
  13. from celery._state import set_default_app
  14. from celery.app import trace
  15. from celery.concurrency.base import BasePool
  16. from celery.five import items
  17. from celery.utils.log import get_logger
  18. from .asynpool import AsynPool
  19. __all__ = ['TaskPool', 'process_initializer', 'process_destructor']
  20. #: List of signals to reset when a child process starts.
  21. WORKER_SIGRESET = frozenset(['SIGTERM',
  22. 'SIGHUP',
  23. 'SIGTTIN',
  24. 'SIGTTOU',
  25. 'SIGUSR1'])
  26. #: List of signals to ignore when a child process starts.
  27. WORKER_SIGIGNORE = frozenset(['SIGINT'])
  28. MAXTASKS_NO_BILLIARD = """\
  29. maxtasksperchild enabled but billiard C extension not installed!
  30. This may lead to a deadlock, so please install the billiard C extension.
  31. """
  32. logger = get_logger(__name__)
  33. warning, debug = logger.warning, logger.debug
  34. def process_initializer(app, hostname):
  35. """Pool child process initializer.
  36. This will initialize a child pool process to ensure the correct
  37. app instance is used and things like
  38. logging works.
  39. """
  40. platforms.signals.reset(*WORKER_SIGRESET)
  41. platforms.signals.ignore(*WORKER_SIGIGNORE)
  42. platforms.set_mp_process_title('celeryd', hostname=hostname)
  43. # This is for Windows and other platforms not supporting
  44. # fork(). Note that init_worker makes sure it's only
  45. # run once per process.
  46. app.loader.init_worker()
  47. app.loader.init_worker_process()
  48. app.log.setup(int(os.environ.get('CELERY_LOG_LEVEL', 0) or 0),
  49. os.environ.get('CELERY_LOG_FILE') or None,
  50. bool(os.environ.get('CELERY_LOG_REDIRECT', False)),
  51. str(os.environ.get('CELERY_LOG_REDIRECT_LEVEL')))
  52. if os.environ.get('FORKED_BY_MULTIPROCESSING'):
  53. # pool did execv after fork
  54. trace.setup_worker_optimizations(app)
  55. else:
  56. app.set_current()
  57. set_default_app(app)
  58. app.finalize()
  59. trace._tasks = app._tasks # enables fast_trace_task optimization.
  60. # rebuild execution handler for all tasks.
  61. from celery.app.trace import build_tracer
  62. for name, task in items(app.tasks):
  63. task.__trace__ = build_tracer(name, task, app.loader, hostname,
  64. app=app)
  65. signals.worker_process_init.send(sender=None)
  66. def process_destructor(pid, exitcode):
  67. """Pool child process destructor
  68. Dispatch the :signal:`worker_process_shutdown` signal.
  69. """
  70. signals.worker_process_shutdown.send(
  71. sender=None, pid=pid, exitcode=exitcode,
  72. )
  73. class TaskPool(BasePool):
  74. """Multiprocessing Pool implementation."""
  75. Pool = AsynPool
  76. BlockingPool = BlockingPool
  77. uses_semaphore = True
  78. write_stats = None
  79. def on_start(self):
  80. """Run the task pool.
  81. Will pre-fork all workers so they're ready to accept tasks.
  82. """
  83. if self.options.get('maxtasksperchild'):
  84. try:
  85. from billiard.connection import Connection
  86. Connection.send_offset
  87. except (ImportError, AttributeError):
  88. # billiard C extension not installed
  89. warning(MAXTASKS_NO_BILLIARD)
  90. forking_enable(self.forking_enable)
  91. Pool = (self.BlockingPool if self.options.get('threads', True)
  92. else self.Pool)
  93. P = self._pool = Pool(processes=self.limit,
  94. initializer=process_initializer,
  95. on_process_exit=process_destructor,
  96. synack=False,
  97. **self.options)
  98. # Create proxy methods
  99. self.on_apply = P.apply_async
  100. self.maintain_pool = P.maintain_pool
  101. self.terminate_job = P.terminate_job
  102. self.grow = P.grow
  103. self.shrink = P.shrink
  104. self.flush = getattr(P, 'flush', None) # FIXME add to billiard
  105. self.restart = P.restart
  106. def did_start_ok(self):
  107. return self._pool.did_start_ok()
  108. def register_with_event_loop(self, loop):
  109. try:
  110. reg = self._pool.register_with_event_loop
  111. except AttributeError:
  112. return
  113. return reg(loop)
  114. def on_stop(self):
  115. """Gracefully stop the pool."""
  116. if self._pool is not None and self._pool._state in (RUN, CLOSE):
  117. self._pool.close()
  118. self._pool.join()
  119. self._pool = None
  120. def on_terminate(self):
  121. """Force terminate the pool."""
  122. if self._pool is not None:
  123. self._pool.terminate()
  124. self._pool = None
  125. def on_close(self):
  126. if self._pool is not None and self._pool._state == RUN:
  127. self._pool.close()
  128. def _get_info(self):
  129. return {
  130. 'max-concurrency': self.limit,
  131. 'processes': [p.pid for p in self._pool._pool],
  132. 'max-tasks-per-child': self._pool._maxtasksperchild or 'N/A',
  133. 'put-guarded-by-semaphore': self.putlocks,
  134. 'timeouts': (self._pool.soft_timeout or 0,
  135. self._pool.timeout or 0),
  136. 'writes': self._pool.human_write_stats(),
  137. }
  138. @property
  139. def num_processes(self):
  140. return self._pool._processes