from __future__ import absolute_import import logging from tornado import web from ..views import BaseHandler from ..models import WorkersModel logger = logging.getLogger(__name__) class ControlHandler(BaseHandler): def is_worker(self, name): return WorkersModel.is_worker(self.application, name) def error_reason(self, workername, response): "extracts error message from response" for r in response: try: return r[workername].get('error', 'Unknown error') except KeyError: pass class WorkerShutDown(ControlHandler): @web.authenticated def post(self, workername): """ Shut down a worker **Example request**: .. sourcecode:: http POST /api/worker/shutdown/celery@worker2 HTTP/1.1 Content-Length: 0 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 29 Content-Type: application/json; charset=UTF-8 { "message": "Shutting down!" } :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app logger.info("Shutting down '%s' worker", workername) celery.control.broadcast('shutdown', destination=[workername]) self.write(dict(message="Shutting down!")) class WorkerPoolRestart(ControlHandler): @web.authenticated def post(self, workername): """ Restart worker's pool **Example request**: .. sourcecode:: http POST /api/worker/pool/restart/celery@worker2 HTTP/1.1 Content-Length: 0 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 56 Content-Type: application/json; charset=UTF-8 { "message": "Restarting 'celery@worker2' worker's pool" } :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: pool restart is not enabled (see CELERYD_POOL_RESTARTS) :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app logger.info("Restarting '%s' worker's pool", workername) response = celery.control.broadcast('pool_restart', arguments={'reload': False}, destination=[workername], reply=True) if response and 'ok' in response[0][workername]: self.write(dict( message="Restarting '%s' worker's pool" % workername)) else: logger.error(response) self.set_status(403) self.write("Failed to restart the '%s' pool: %s" % ( workername, self.error_reason(workername, response) )) class WorkerPoolGrow(ControlHandler): @web.authenticated def post(self, workername): """ Grow worker's pool **Example request**: .. sourcecode:: http POST /api/worker/pool/grow/celery@worker2?n=3 HTTP/1.1 Content-Length: 0 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 58 Content-Type: application/json; charset=UTF-8 { "message": "Growing 'celery@worker2' worker's pool by 3" } :query n: number of pool processes to grow, default is 1 :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: failed to grow :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app n = self.get_argument('n', default=1, type=int) logger.info("Growing '%s' worker's pool by '%s'", workername, n) response = celery.control.pool_grow(n=n, reply=True, destination=[workername]) if response and 'ok' in response[0][workername]: self.write(dict( message="Growing '%s' worker's pool by %s" % (workername, n))) else: logger.error(response) self.set_status(403) self.write("Failed to grow '%s' worker's pool" % ( workername, self.error_reason(workername, response))) class WorkerPoolShrink(ControlHandler): @web.authenticated def post(self, workername): """ Shrink worker's pool **Example request**: .. sourcecode:: http POST /api/worker/pool/shrink/celery@worker2 HTTP/1.1 Content-Length: 0 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 60 Content-Type: application/json; charset=UTF-8 { "message": "Shrinking 'celery@worker2' worker's pool by 1" } :query n: number of pool processes to shrink, default is 1 :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: failed to shrink :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app n = self.get_argument('n', default=1, type=int) logger.info("Shrinking '%s' worker's pool by '%s'", workername, n) response = celery.control.pool_shrink(n=n, reply=True, destination=[workername]) if response and 'ok' in response[0][workername]: self.write(dict(message="Shrinking '%s' worker's pool by %s" % ( workername, n))) else: logger.error(response) self.set_status(403) self.write("Failed to shrink '%s' worker's pool: %s" % ( workername, self.error_reason(workername, response) )) class WorkerPoolAutoscale(ControlHandler): @web.authenticated def post(self, workername): """ Autoscale worker pool **Example request**: .. sourcecode:: http POST /api/worker/pool/autoscale/celery@worker2?min=3&max=10 HTTP/1.1 Content-Length: 0 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 66 Content-Type: application/json; charset=UTF-8 { "message": "Autoscaling 'celery@worker2' worker (min=3, max=10)" } :query min: minimum number of pool processes :query max: maximum number of pool processes :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: autoscaling is not enabled (see CELERYD_AUTOSCALER) :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app min = self.get_argument('min', type=int) max = self.get_argument('max', type=int) logger.info("Autoscaling '%s' worker by '%s'", workername, (min, max)) response = celery.control.broadcast('autoscale', arguments={'min': min, 'max': max}, destination=[workername], reply=True) if response and 'ok' in response[0][workername]: self.write(dict(message="Autoscaling '%s' worker " "(min=%s, max=%s)" % ( workername, min, max))) else: logger.error(response) self.set_status(403) self.write("Failed to autoscale '%s' worker: %s" % ( workername, self.error_reason(workername, response) )) class WorkerQueueAddConsumer(ControlHandler): @web.authenticated def post(self, workername): """ Start consuming from a queue **Example request**: .. sourcecode:: http POST /api/worker/queue/add-consumer/celery@worker2?queue=sample-queue Content-Length: 0 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 40 Content-Type: application/json; charset=UTF-8 { "message": "add consumer sample-queue" } :query queue: the name of a new queue :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: failed to add consumer :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app queue = self.get_argument('queue') logger.info("Adding consumer '%s' to worker '%s'", queue, workername) response = celery.control.broadcast('add_consumer', arguments={'queue': queue}, destination=[workername], reply=True) if response and 'ok' in response[0][workername]: self.write(dict(message=response[0][workername]['ok'])) else: logger.error(response) self.set_status(403) self.write("Failed to add '%s' consumer to '%s' worker: %s" % ( workername, self.error_reason(workername, response) )) class WorkerQueueCancelConsumer(ControlHandler): @web.authenticated def post(self, workername): """ Stop consuming from a queue **Example request**: .. sourcecode:: http POST /api/worker/queue/cancel-consumer/celery@worker2?queue=sample-queue Content-Length: 0 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 52 Content-Type: application/json; charset=UTF-8 { "message": "no longer consuming from sample-queue" } :query queue: the name of queue :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 403: failed to cancel consumer :statuscode 404: unknown worker """ if not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) celery = self.application.celery_app queue = self.get_argument('queue') logger.info("Canceling consumer '%s' from worker '%s'", queue, workername) response = celery.control.broadcast('cancel_consumer', arguments={'queue': queue}, destination=[workername], reply=True) if response and 'ok' in response[0][workername]: self.write(dict(message=response[0][workername]['ok'])) else: logger.error(response) self.set_status(403) self.write( "Failed to cancel '%s' consumer from '%s' worker: %s" % ( workername, self.error_reason(workername, response) )) class TaskRevoke(BaseHandler): @web.authenticated def post(self, taskid): """ Revoke a task **Example request**: .. sourcecode:: http POST /api/task/revoke/1480b55c-b8b2-462c-985e-24af3e9158f9?terminate=true Content-Length: 0 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 61 Content-Type: application/json; charset=UTF-8 { "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'" } :query terminate: terminate the task if it is running :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request """ logger.info("Revoking task '%s'", taskid) celery = self.application.celery_app terminate = self.get_argument('terminate', default=False, type=bool) celery.control.revoke(taskid, terminate=terminate) self.write(dict(message="Revoked '%s'" % taskid)) class TaskTimout(ControlHandler): @web.authenticated def post(self, taskname): """ Change soft and hard time limits for a task **Example request**: .. sourcecode:: http POST /api/task/timeout/tasks.sleep HTTP/1.1 Content-Length: 44 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 soft=30&hard=100&workername=celery%40worker1 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 46 Content-Type: application/json; charset=UTF-8 { "message": "new rate limit set successfully" } :query workername: worker name :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 404: unknown task/worker """ celery = self.application.celery_app workername = self.get_argument('workername') hard = self.get_argument('hard', default=None, type=float) soft = self.get_argument('soft', default=None, type=float) if taskname not in celery.tasks: raise web.HTTPError(404, "Unknown task '%s'" % taskname) if workername is not None and not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) logger.info("Setting timeouts for '%s' task (%s, %s)", taskname, soft, hard) destination = [workername] if workername is not None else None response = celery.control.time_limit(taskname, reply=True, hard=hard, soft=soft, destination=destination) if response and 'ok' in response[0][workername]: self.write(dict(message=response[0][workername]['ok'])) else: logger.error(response) self.set_status(403) self.write("Failed to set timeouts: '%s'" % self.error_reason(taskname, response)) class TaskRateLimit(ControlHandler): @web.authenticated def post(self, taskname): """ Change rate limit for a task **Example request**: .. sourcecode:: http POST /api/task/rate-limit/tasks.sleep HTTP/1.1 Content-Length: 41 Content-Type: application/x-www-form-urlencoded; charset=utf-8 Host: localhost:5555 ratelimit=200&workername=celery%40worker1 **Example response**: .. sourcecode:: http HTTP/1.1 200 OK Content-Length: 61 Content-Type: application/json; charset=UTF-8 { "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'" } :query terminate: terminate the task if it is running :reqheader Authorization: optional OAuth token to authenticate :statuscode 200: no error :statuscode 401: unauthorized request :statuscode 404: unknown task/worker """ celery = self.application.celery_app workername = self.get_argument('workername') ratelimit = self.get_argument('ratelimit') if taskname not in celery.tasks: raise web.HTTPError(404, "Unknown task '%s'" % taskname) if workername is not None and not self.is_worker(workername): raise web.HTTPError(404, "Unknown worker '%s'" % workername) logger.info("Setting '%s' rate limit for '%s' task", ratelimit, taskname) destination = [workername] if workername is not None else None response = celery.control.rate_limit(taskname, ratelimit, reply=True, destination=destination) if response and 'ok' in response[0][workername]: self.write(dict(message=response[0][workername]['ok'])) else: logger.error(response) self.set_status(403) self.write("Failed to set rate limit: '%s'" % self.error_reason(taskname, response))