base.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.concurrency.base
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. TaskPool interface.
  6. """
  7. from __future__ import absolute_import
  8. import logging
  9. import os
  10. import sys
  11. from billiard.einfo import ExceptionInfo
  12. from billiard.exceptions import WorkerLostError
  13. from kombu.utils.encoding import safe_repr
  14. from celery.five import monotonic, reraise
  15. from celery.utils import timer2
  16. from celery.utils.log import get_logger
  17. __all__ = ['BasePool', 'apply_target']
  18. logger = get_logger('celery.pool')
  19. def apply_target(target, args=(), kwargs={}, callback=None,
  20. accept_callback=None, pid=None, getpid=os.getpid,
  21. propagate=(), monotonic=monotonic, **_):
  22. if accept_callback:
  23. accept_callback(pid or getpid(), monotonic())
  24. try:
  25. ret = target(*args, **kwargs)
  26. except propagate:
  27. raise
  28. except Exception:
  29. raise
  30. except BaseException as exc:
  31. try:
  32. reraise(WorkerLostError, WorkerLostError(repr(exc)),
  33. sys.exc_info()[2])
  34. except WorkerLostError:
  35. callback(ExceptionInfo())
  36. else:
  37. callback(ret)
  38. class BasePool(object):
  39. RUN = 0x1
  40. CLOSE = 0x2
  41. TERMINATE = 0x3
  42. Timer = timer2.Timer
  43. #: set to true if the pool can be shutdown from within
  44. #: a signal handler.
  45. signal_safe = True
  46. #: set to true if pool uses greenlets.
  47. is_green = False
  48. _state = None
  49. _pool = None
  50. #: only used by multiprocessing pool
  51. uses_semaphore = False
  52. def __init__(self, limit=None, putlocks=True,
  53. forking_enable=True, callbacks_propagate=(), **options):
  54. self.limit = limit
  55. self.putlocks = putlocks
  56. self.options = options
  57. self.forking_enable = forking_enable
  58. self.callbacks_propagate = callbacks_propagate
  59. self._does_debug = logger.isEnabledFor(logging.DEBUG)
  60. def on_start(self):
  61. pass
  62. def did_start_ok(self):
  63. return True
  64. def flush(self):
  65. pass
  66. def on_stop(self):
  67. pass
  68. def register_with_event_loop(self, loop):
  69. pass
  70. def on_apply(self, *args, **kwargs):
  71. pass
  72. def on_terminate(self):
  73. pass
  74. def on_soft_timeout(self, job):
  75. pass
  76. def on_hard_timeout(self, job):
  77. pass
  78. def maintain_pool(self, *args, **kwargs):
  79. pass
  80. def terminate_job(self, pid):
  81. raise NotImplementedError(
  82. '{0} does not implement kill_job'.format(type(self)))
  83. def restart(self):
  84. raise NotImplementedError(
  85. '{0} does not implement restart'.format(type(self)))
  86. def stop(self):
  87. self.on_stop()
  88. self._state = self.TERMINATE
  89. def terminate(self):
  90. self._state = self.TERMINATE
  91. self.on_terminate()
  92. def start(self):
  93. self.on_start()
  94. self._state = self.RUN
  95. def close(self):
  96. self._state = self.CLOSE
  97. self.on_close()
  98. def on_close(self):
  99. pass
  100. def apply_async(self, target, args=[], kwargs={}, **options):
  101. """Equivalent of the :func:`apply` built-in function.
  102. Callbacks should optimally return as soon as possible since
  103. otherwise the thread which handles the result will get blocked.
  104. """
  105. if self._does_debug:
  106. logger.debug('TaskPool: Apply %s (args:%s kwargs:%s)',
  107. target, safe_repr(args), safe_repr(kwargs))
  108. return self.on_apply(target, args, kwargs,
  109. waitforslot=self.putlocks,
  110. callbacks_propagate=self.callbacks_propagate,
  111. **options)
  112. def _get_info(self):
  113. return {}
  114. @property
  115. def info(self):
  116. return self._get_info()
  117. @property
  118. def active(self):
  119. return self._state == self.RUN
  120. @property
  121. def num_processes(self):
  122. return self.limit