state.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import time
  4. import copy
  5. import logging
  6. import threading
  7. from pprint import pformat
  8. import celery
  9. from . import settings
  10. from .utils.broker import Broker
  11. logger = logging.getLogger(__name__)
  12. class State(threading.Thread):
  13. def __init__(self, celery_app, broker_api=None):
  14. threading.Thread.__init__(self)
  15. self.daemon = True
  16. self._celery_app = celery_app
  17. self._broker_api = broker_api
  18. self._update_lock = threading.Lock()
  19. self._inspect = threading.Event()
  20. self._inspect.set()
  21. self._last_access = time.time()
  22. self._stats = {}
  23. self._registered_tasks = {}
  24. self._scheduled_tasks = {}
  25. self._active_tasks = {}
  26. self._reserved_tasks = {}
  27. self._revoked_tasks = {}
  28. self._ping = {}
  29. self._active_queues = {}
  30. self._confs = {}
  31. self._broker_queues = []
  32. def run(self):
  33. try:
  34. transport = self._celery_app.connection().transport.driver_type
  35. except AttributeError:
  36. # Celery versions prior to 3 don't have driver_type
  37. transport = None
  38. if transport and transport not in ('amqp', 'redis', 'mongodb'):
  39. logger.error("Dashboard and worker management commands are "
  40. "not available for '%s' transport", transport)
  41. return
  42. if celery.__version__ < '3.0.0':
  43. logger.warning("Configuration viewer is not available for "
  44. "Celery versions prior to 3.0")
  45. timeout = settings.CELERY_INSPECT_TIMEOUT / 1000.0
  46. i = self._celery_app.control.inspect(timeout=timeout)
  47. burl = self._celery_app.connection().as_uri(include_password=True)
  48. broker = None
  49. try:
  50. if transport == 'amqp' and self._broker_api:
  51. broker = Broker(burl, self._broker_api)
  52. broker.queues([])
  53. elif transport == 'redis':
  54. broker = Broker(burl)
  55. except Exception as e:
  56. logger.error("Unable to get broker info: %s" % e)
  57. if transport == 'amqp' and not self._broker_api:
  58. logger.warning("Broker info is not available if --broker_api "
  59. "option is not configured. Also make sure "
  60. "RabbitMQ Management Plugin is enabled ("
  61. "rabbitmq-plugins enable rabbitmq_management)")
  62. try_interval = 1
  63. while True:
  64. try:
  65. try_interval *= 2
  66. logger.debug('Inspecting workers...')
  67. stats = i.stats()
  68. logger.debug('Stats: %s', pformat(stats))
  69. registered = i.registered()
  70. logger.debug('Registered: %s', pformat(registered))
  71. scheduled = i.scheduled()
  72. logger.debug('Scheduled: %s', pformat(scheduled))
  73. active = i.active()
  74. logger.debug('Active: %s', pformat(active))
  75. reserved = i.reserved()
  76. logger.debug('Reserved: %s', pformat(reserved))
  77. revoked = i.revoked()
  78. logger.debug('Revoked: %s', pformat(revoked))
  79. ping = i.ping()
  80. logger.debug('Ping: %s', pformat(ping))
  81. active_queues = i.active_queues()
  82. logger.debug('Active queues: %s', pformat(active_queues))
  83. # Inspect.conf was introduced in Celery 3.1
  84. conf = hasattr(i, 'conf') and i.conf()
  85. logger.debug('Conf: %s', pformat(conf))
  86. try:
  87. if broker:
  88. broker_queues = broker.queues(self.active_queue_names)
  89. else:
  90. broker_queues = None
  91. logger.debug('Broker queues: %s', pformat(broker_queues))
  92. except Exception as e:
  93. broker_queues = []
  94. logger.error("Failed to inspect the broker: %s", e)
  95. logger.debug(e, exc_info=True)
  96. with self._update_lock:
  97. self._stats.update(stats or {})
  98. self._registered_tasks = registered or {}
  99. self._scheduled_tasks = scheduled or {}
  100. self._active_tasks = active or {}
  101. self._reserved_tasks = reserved or {}
  102. self._revoked_tasks = revoked or {}
  103. self._ping = ping or {}
  104. self._active_queues = active_queues or {}
  105. self._conf = conf or {}
  106. self._broker_queues = broker_queues or []
  107. try_interval = 1
  108. if time.time() - self._last_access > 60 * timeout:
  109. self.pause()
  110. self._inspect.wait()
  111. except (KeyboardInterrupt, SystemExit):
  112. try:
  113. import _thread as thread
  114. except ImportError:
  115. import thread
  116. thread.interrupt_main()
  117. except Exception as e:
  118. logger.error("Failed to inspect workers: '%s', trying "
  119. "again in %s seconds", e, try_interval)
  120. logger.debug(e, exc_info=True)
  121. time.sleep(try_interval)
  122. def pause(self):
  123. "stop inspecting workers until resume is called"
  124. logger.debug('Stopping inspecting workers...')
  125. self._inspect.clear()
  126. def resume(self):
  127. "resume inspecting workers"
  128. logger.debug('Resuming inspecting workers...')
  129. self._inspect.set()
  130. self._last_access = time.time()
  131. def __getattr__(self, name):
  132. if name in ['stats', 'registered_tasks', 'scheduled_tasks',
  133. 'active_tasks', 'reserved_tasks', 'revoked_tasks',
  134. 'ping', 'active_queues', 'conf', 'broker_queues']:
  135. with self._update_lock:
  136. self._last_access = time.time()
  137. return copy.deepcopy(getattr(self, '_' + name))
  138. super(State, self).__getattr__(name)
  139. @property
  140. def active_queue_names(self):
  141. queues = set([])
  142. for q in self._active_queues.values():
  143. queues.update(map(lambda x: x['name'], q))
  144. return queues