monitor.py 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. from __future__ import absolute_import
  2. from collections import defaultdict
  3. from tornado import web
  4. from celery import states
  5. from ..views import BaseHandler
  6. class Monitor(BaseHandler):
  7. @web.authenticated
  8. def get(self):
  9. self.render("monitor.html")
  10. class SucceededTaskMonitor(BaseHandler):
  11. @web.authenticated
  12. def get(self):
  13. timestamp = self.get_argument('lastquery', type=float)
  14. state = self.application.events.state
  15. data = defaultdict(int)
  16. for _, task in state.itertasks():
  17. utcoffset = getattr(task, 'utcoffset', 0)
  18. if (timestamp < (task.timestamp + (utcoffset * 3600))
  19. and task.state == states.SUCCESS):
  20. data[task.worker.hostname] += 1
  21. for worker in state.workers:
  22. if worker not in data:
  23. data[worker] = 0
  24. self.write(data)
  25. class TimeToCompletionMonitor(BaseHandler):
  26. @web.authenticated
  27. def get(self):
  28. timestamp = self.get_argument('lastquery', type=float)
  29. state = self.application.events.state
  30. execute_time = 0
  31. queue_time = 0
  32. num_tasks = 0
  33. for _, task in state.itertasks():
  34. utcoffset = getattr(task, 'utcoffset', 0)
  35. if (timestamp < (task.timestamp + (utcoffset * 3600))
  36. and task.state == states.SUCCESS):
  37. # eta can make "time in queue" look really scary.
  38. if task.eta is not None:
  39. continue
  40. if task.started is None or task.received is None or\
  41. task.succeeded is None:
  42. continue
  43. queue_time += task.started - task.received
  44. execute_time += task.succeeded - task.started
  45. num_tasks += 1
  46. avg_queue_time = (queue_time / num_tasks) if num_tasks > 0 else 0
  47. avg_execution_time = (execute_time / num_tasks) if num_tasks > 0 else 0
  48. result = {
  49. "Time in a queue": avg_queue_time,
  50. "Execution time": avg_execution_time,
  51. }
  52. self.write(result)
  53. class FailedTaskMonitor(BaseHandler):
  54. @web.authenticated
  55. def get(self):
  56. timestamp = self.get_argument('lastquery', type=float)
  57. state = self.application.events.state
  58. data = defaultdict(int)
  59. for _, task in state.itertasks():
  60. utcoffset = getattr(task, 'utcoffset', 0)
  61. if (timestamp < (task.timestamp + (utcoffset * 3600))
  62. and task.state == states.FAILURE):
  63. data[task.worker.hostname] += 1
  64. for worker in state.workers:
  65. if worker not in data:
  66. data[worker] = 0
  67. self.write(data)
  68. class BrokerMonitor(BaseHandler):
  69. @web.authenticated
  70. def get(self):
  71. state = self.application.state
  72. data = defaultdict(int)
  73. for queue in state.broker_queues:
  74. data[queue['name']] = queue['messages']
  75. self.write(data)