123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- from __future__ import absolute_import
- from collections import defaultdict
- from tornado import web
- from celery import states
- from ..views import BaseHandler
- class Monitor(BaseHandler):
- @web.authenticated
- def get(self):
- self.render("monitor.html")
- class SucceededTaskMonitor(BaseHandler):
- @web.authenticated
- def get(self):
- timestamp = self.get_argument('lastquery', type=float)
- state = self.application.events.state
- data = defaultdict(int)
- for _, task in state.itertasks():
- utcoffset = getattr(task, 'utcoffset', 0)
- if (timestamp < (task.timestamp + (utcoffset * 3600))
- and task.state == states.SUCCESS):
- data[task.worker.hostname] += 1
- for worker in state.workers:
- if worker not in data:
- data[worker] = 0
- self.write(data)
- class TimeToCompletionMonitor(BaseHandler):
- @web.authenticated
- def get(self):
- timestamp = self.get_argument('lastquery', type=float)
- state = self.application.events.state
- execute_time = 0
- queue_time = 0
- num_tasks = 0
- for _, task in state.itertasks():
- utcoffset = getattr(task, 'utcoffset', 0)
- if (timestamp < (task.timestamp + (utcoffset * 3600))
- and task.state == states.SUCCESS):
- # eta can make "time in queue" look really scary.
- if task.eta is not None:
- continue
- if task.started is None or task.received is None or\
- task.succeeded is None:
- continue
- queue_time += task.started - task.received
- execute_time += task.succeeded - task.started
- num_tasks += 1
- avg_queue_time = (queue_time / num_tasks) if num_tasks > 0 else 0
- avg_execution_time = (execute_time / num_tasks) if num_tasks > 0 else 0
- result = {
- "Time in a queue": avg_queue_time,
- "Execution time": avg_execution_time,
- }
- self.write(result)
- class FailedTaskMonitor(BaseHandler):
- @web.authenticated
- def get(self):
- timestamp = self.get_argument('lastquery', type=float)
- state = self.application.events.state
- data = defaultdict(int)
- for _, task in state.itertasks():
- utcoffset = getattr(task, 'utcoffset', 0)
- if (timestamp < (task.timestamp + (utcoffset * 3600))
- and task.state == states.FAILURE):
- data[task.worker.hostname] += 1
- for worker in state.workers:
- if worker not in data:
- data[worker] = 0
- self.write(data)
- class BrokerMonitor(BaseHandler):
- @web.authenticated
- def get(self):
- state = self.application.state
- data = defaultdict(int)
- for queue in state.broker_queues:
- data[queue['name']] = queue['messages']
- self.write(data)
|