models.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. from celery.events.state import Task as _Task
  4. try:
  5. from collections import OrderedDict
  6. except ImportError:
  7. # celery <3.2 provides this
  8. from celery.utils.compat import OrderedDict
  9. class BaseModel(object):
  10. def __init__(self, app):
  11. self.app = app
  12. def __eq__(self, other):
  13. raise NotImplementedError
  14. def __ne__(self, other):
  15. return not self.__eq__(other)
  16. class WorkersModel(BaseModel):
  17. def __init__(self, app):
  18. super(WorkersModel, self).__init__(app)
  19. self.workers = OrderedDict()
  20. state = self.app.state
  21. for workername, stat in sorted(state.stats.items()):
  22. pool = stat.get('pool') or {}
  23. self.workers[workername] = dict(
  24. status=(workername in state.ping),
  25. concurrency=pool.get('max-concurrency'),
  26. completed_tasks=sum(stat.get('total', {}).values()),
  27. running_tasks=len(state.active_tasks.get(workername, [])),
  28. queues=[x['name'] for x in
  29. state.active_queues.get(workername, []) if x]
  30. )
  31. @classmethod
  32. def get_latest(cls, app):
  33. return WorkersModel(app)
  34. @classmethod
  35. def get_workers(cls, app):
  36. return list(app.state.stats.keys())
  37. @classmethod
  38. def is_worker(cls, app, workername):
  39. return WorkerModel.get_worker(app, workername) is not None
  40. def __eq__(self, other):
  41. return other is not None and self.workers == other.workers
  42. class WorkerModel(BaseModel):
  43. def __init__(self, app, name):
  44. super(WorkerModel, self).__init__(app)
  45. state = self.app.state
  46. self.name = name
  47. self.stats = state.stats[name]
  48. self.active_tasks = state.active_tasks.get(name, {})
  49. self.scheduled_tasks = state.scheduled_tasks.get(name, {})
  50. self.active_queues = state.active_queues.get(name, {})
  51. self.revoked_tasks = state.revoked_tasks.get(name, [])
  52. self.registered_tasks = [x for x in state.registered_tasks.get(
  53. name, {}) if not x.startswith('celery.')]
  54. self.reserved_tasks = state.reserved_tasks.get(name, {})
  55. self.conf = state.conf.get(name, {})
  56. @classmethod
  57. def get_worker(self, app, name):
  58. if name not in app.state.stats:
  59. return None
  60. return WorkerModel(app, name)
  61. def __eq__(self, other):
  62. return self.name == other.name and self.stats == other.stats and\
  63. self.active_tasks == other.active_tasks and\
  64. self.active_queues == other.active_queues and\
  65. self.revoked_tasks == other.revoked_tasks and\
  66. self.registered_tasks == other.registered_tasks and\
  67. self.scheduled_tasks == other.scheduled_tasks and\
  68. self.reserved_tasks == other.reserved_tasks and\
  69. self.conf == other.conf
  70. class TaskModel(BaseModel):
  71. def __init__(self, app, task_id):
  72. self.uuid = task_id
  73. if hasattr(_Task, '_fields'): # Old version
  74. @classmethod
  75. def get_task_by_id(cls, app, task_id):
  76. return app.events.state.tasks.get(task_id)
  77. else:
  78. _fields = _Task._defaults.keys()
  79. @classmethod
  80. def get_task_by_id(cls, app, task_id):
  81. task = app.events.state.tasks.get(task_id)
  82. if task is not None:
  83. task._fields = cls._fields
  84. return task
  85. @classmethod
  86. def iter_tasks(cls, app, limit=None, type=None, worker=None, state=None):
  87. i = 0
  88. events_state = app.events.state
  89. for uuid, task in events_state.tasks_by_timestamp():
  90. if type and task.name != type:
  91. continue
  92. if worker and task.worker and task.worker.hostname != worker:
  93. continue
  94. if state and task.state != state:
  95. continue
  96. yield uuid, task
  97. i += 1
  98. if i == limit:
  99. break
  100. @classmethod
  101. def seen_task_types(cls, app):
  102. return app.events.state.task_types()
  103. def __dir__(self):
  104. return self._fields
  105. class BrokerModel(BaseModel):
  106. def __init__(self, app):
  107. super(BrokerModel, self).__init__(app)
  108. @property
  109. def url(self):
  110. return self.app.celery_app.connection().as_uri()
  111. @property
  112. def queues(self):
  113. return self.app.state.broker_queues
  114. @property
  115. def info_available(self):
  116. if self.app.celery_app.connection().transport == 'amqp' and\
  117. not self.app.options.broker_api:
  118. return False
  119. return True