control.py 16 KB


  1. from __future__ import absolute_import
  2. import logging
  3. from tornado import web
  4. from ..views import BaseHandler
  5. from ..models import WorkersModel
  6. logger = logging.getLogger(__name__)
  7. class ControlHandler(BaseHandler):
  8. def is_worker(self, name):
  9. return WorkersModel.is_worker(self.application, name)
  10. def error_reason(self, workername, response):
  11. "extracts error message from response"
  12. for r in response:
  13. try:
  14. return r[workername].get('error', 'Unknown error')
  15. except KeyError:
  16. pass
  17. class WorkerShutDown(ControlHandler):
  18. @web.authenticated
  19. def post(self, workername):
  20. """
  21. Shut down a worker
  22. **Example request**:
  23. .. sourcecode:: http
  24. POST /api/worker/shutdown/celery@worker2 HTTP/1.1
  25. Content-Length: 0
  26. Host: localhost:5555
  27. **Example response**:
  28. .. sourcecode:: http
  29. HTTP/1.1 200 OK
  30. Content-Length: 29
  31. Content-Type: application/json; charset=UTF-8
  32. {
  33. "message": "Shutting down!"
  34. }
  35. :reqheader Authorization: optional OAuth token to authenticate
  36. :statuscode 200: no error
  37. :statuscode 401: unauthorized request
  38. :statuscode 404: unknown worker
  39. """
  40. if not self.is_worker(workername):
  41. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  42. celery = self.application.celery_app
  43. logger.info("Shutting down '%s' worker", workername)
  44. celery.control.broadcast('shutdown', destination=[workername])
  45. self.write(dict(message="Shutting down!"))
  46. class WorkerPoolRestart(ControlHandler):
  47. @web.authenticated
  48. def post(self, workername):
  49. """
  50. Restart worker's pool
  51. **Example request**:
  52. .. sourcecode:: http
  53. POST /api/worker/pool/restart/celery@worker2 HTTP/1.1
  54. Content-Length: 0
  55. Host: localhost:5555
  56. **Example response**:
  57. .. sourcecode:: http
  58. HTTP/1.1 200 OK
  59. Content-Length: 56
  60. Content-Type: application/json; charset=UTF-8
  61. {
  62. "message": "Restarting 'celery@worker2' worker's pool"
  63. }
  64. :reqheader Authorization: optional OAuth token to authenticate
  65. :statuscode 200: no error
  66. :statuscode 401: unauthorized request
  67. :statuscode 403: pool restart is not enabled (see CELERYD_POOL_RESTARTS)
  68. :statuscode 404: unknown worker
  69. """
  70. if not self.is_worker(workername):
  71. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  72. celery = self.application.celery_app
  73. logger.info("Restarting '%s' worker's pool", workername)
  74. response = celery.control.broadcast('pool_restart',
  75. arguments={'reload': False},
  76. destination=[workername],
  77. reply=True)
  78. if response and 'ok' in response[0][workername]:
  79. self.write(dict(
  80. message="Restarting '%s' worker's pool" % workername))
  81. else:
  82. logger.error(response)
  83. self.set_status(403)
  84. self.write("Failed to restart the '%s' pool: %s" % (
  85. workername, self.error_reason(workername, response)
  86. ))
  87. class WorkerPoolGrow(ControlHandler):
  88. @web.authenticated
  89. def post(self, workername):
  90. """
  91. Grow worker's pool
  92. **Example request**:
  93. .. sourcecode:: http
  94. POST /api/worker/pool/grow/celery@worker2?n=3 HTTP/1.1
  95. Content-Length: 0
  96. Host: localhost:5555
  97. **Example response**:
  98. .. sourcecode:: http
  99. HTTP/1.1 200 OK
  100. Content-Length: 58
  101. Content-Type: application/json; charset=UTF-8
  102. {
  103. "message": "Growing 'celery@worker2' worker's pool by 3"
  104. }
  105. :query n: number of pool processes to grow, default is 1
  106. :reqheader Authorization: optional OAuth token to authenticate
  107. :statuscode 200: no error
  108. :statuscode 401: unauthorized request
  109. :statuscode 403: failed to grow
  110. :statuscode 404: unknown worker
  111. """
  112. if not self.is_worker(workername):
  113. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  114. celery = self.application.celery_app
  115. n = self.get_argument('n', default=1, type=int)
  116. logger.info("Growing '%s' worker's pool by '%s'", workername, n)
  117. response = celery.control.pool_grow(n=n, reply=True,
  118. destination=[workername])
  119. if response and 'ok' in response[0][workername]:
  120. self.write(dict(
  121. message="Growing '%s' worker's pool by %s" % (workername, n)))
  122. else:
  123. logger.error(response)
  124. self.set_status(403)
  125. self.write("Failed to grow '%s' worker's pool" % (
  126. workername, self.error_reason(workername, response)))
  127. class WorkerPoolShrink(ControlHandler):
  128. @web.authenticated
  129. def post(self, workername):
  130. """
  131. Shrink worker's pool
  132. **Example request**:
  133. .. sourcecode:: http
  134. POST /api/worker/pool/shrink/celery@worker2 HTTP/1.1
  135. Content-Length: 0
  136. Host: localhost:5555
  137. **Example response**:
  138. .. sourcecode:: http
  139. HTTP/1.1 200 OK
  140. Content-Length: 60
  141. Content-Type: application/json; charset=UTF-8
  142. {
  143. "message": "Shrinking 'celery@worker2' worker's pool by 1"
  144. }
  145. :query n: number of pool processes to shrink, default is 1
  146. :reqheader Authorization: optional OAuth token to authenticate
  147. :statuscode 200: no error
  148. :statuscode 401: unauthorized request
  149. :statuscode 403: failed to shrink
  150. :statuscode 404: unknown worker
  151. """
  152. if not self.is_worker(workername):
  153. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  154. celery = self.application.celery_app
  155. n = self.get_argument('n', default=1, type=int)
  156. logger.info("Shrinking '%s' worker's pool by '%s'", workername, n)
  157. response = celery.control.pool_shrink(n=n, reply=True,
  158. destination=[workername])
  159. if response and 'ok' in response[0][workername]:
  160. self.write(dict(message="Shrinking '%s' worker's pool by %s" % (
  161. workername, n)))
  162. else:
  163. logger.error(response)
  164. self.set_status(403)
  165. self.write("Failed to shrink '%s' worker's pool: %s" % (
  166. workername, self.error_reason(workername, response)
  167. ))
  168. class WorkerPoolAutoscale(ControlHandler):
  169. @web.authenticated
  170. def post(self, workername):
  171. """
  172. Autoscale worker pool
  173. **Example request**:
  174. .. sourcecode:: http
  175. POST /api/worker/pool/autoscale/celery@worker2?min=3&max=10 HTTP/1.1
  176. Content-Length: 0
  177. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  178. Host: localhost:5555
  179. **Example response**:
  180. .. sourcecode:: http
  181. HTTP/1.1 200 OK
  182. Content-Length: 66
  183. Content-Type: application/json; charset=UTF-8
  184. {
  185. "message": "Autoscaling 'celery@worker2' worker (min=3, max=10)"
  186. }
  187. :query min: minimum number of pool processes
  188. :query max: maximum number of pool processes
  189. :reqheader Authorization: optional OAuth token to authenticate
  190. :statuscode 200: no error
  191. :statuscode 401: unauthorized request
  192. :statuscode 403: autoscaling is not enabled (see CELERYD_AUTOSCALER)
  193. :statuscode 404: unknown worker
  194. """
  195. if not self.is_worker(workername):
  196. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  197. celery = self.application.celery_app
  198. min = self.get_argument('min', type=int)
  199. max = self.get_argument('max', type=int)
  200. logger.info("Autoscaling '%s' worker by '%s'",
  201. workername, (min, max))
  202. response = celery.control.broadcast('autoscale',
  203. arguments={'min': min, 'max': max},
  204. destination=[workername],
  205. reply=True)
  206. if response and 'ok' in response[0][workername]:
  207. self.write(dict(message="Autoscaling '%s' worker "
  208. "(min=%s, max=%s)" % (
  209. workername, min, max)))
  210. else:
  211. logger.error(response)
  212. self.set_status(403)
  213. self.write("Failed to autoscale '%s' worker: %s" % (
  214. workername, self.error_reason(workername, response)
  215. ))
  216. class WorkerQueueAddConsumer(ControlHandler):
  217. @web.authenticated
  218. def post(self, workername):
  219. """
  220. Start consuming from a queue
  221. **Example request**:
  222. .. sourcecode:: http
  223. POST /api/worker/queue/add-consumer/celery@worker2?queue=sample-queue
  224. Content-Length: 0
  225. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  226. Host: localhost:5555
  227. **Example response**:
  228. .. sourcecode:: http
  229. HTTP/1.1 200 OK
  230. Content-Length: 40
  231. Content-Type: application/json; charset=UTF-8
  232. {
  233. "message": "add consumer sample-queue"
  234. }
  235. :query queue: the name of a new queue
  236. :reqheader Authorization: optional OAuth token to authenticate
  237. :statuscode 200: no error
  238. :statuscode 401: unauthorized request
  239. :statuscode 403: failed to add consumer
  240. :statuscode 404: unknown worker
  241. """
  242. if not self.is_worker(workername):
  243. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  244. celery = self.application.celery_app
  245. queue = self.get_argument('queue')
  246. logger.info("Adding consumer '%s' to worker '%s'",
  247. queue, workername)
  248. response = celery.control.broadcast('add_consumer',
  249. arguments={'queue': queue},
  250. destination=[workername],
  251. reply=True)
  252. if response and 'ok' in response[0][workername]:
  253. self.write(dict(message=response[0][workername]['ok']))
  254. else:
  255. logger.error(response)
  256. self.set_status(403)
  257. self.write("Failed to add '%s' consumer to '%s' worker: %s" % (
  258. workername, self.error_reason(workername, response)
  259. ))
  260. class WorkerQueueCancelConsumer(ControlHandler):
  261. @web.authenticated
  262. def post(self, workername):
  263. """
  264. Stop consuming from a queue
  265. **Example request**:
  266. .. sourcecode:: http
  267. POST /api/worker/queue/cancel-consumer/celery@worker2?queue=sample-queue
  268. Content-Length: 0
  269. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  270. Host: localhost:5555
  271. **Example response**:
  272. .. sourcecode:: http
  273. HTTP/1.1 200 OK
  274. Content-Length: 52
  275. Content-Type: application/json; charset=UTF-8
  276. {
  277. "message": "no longer consuming from sample-queue"
  278. }
  279. :query queue: the name of queue
  280. :reqheader Authorization: optional OAuth token to authenticate
  281. :statuscode 200: no error
  282. :statuscode 401: unauthorized request
  283. :statuscode 403: failed to cancel consumer
  284. :statuscode 404: unknown worker
  285. """
  286. if not self.is_worker(workername):
  287. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  288. celery = self.application.celery_app
  289. queue = self.get_argument('queue')
  290. logger.info("Canceling consumer '%s' from worker '%s'",
  291. queue, workername)
  292. response = celery.control.broadcast('cancel_consumer',
  293. arguments={'queue': queue},
  294. destination=[workername],
  295. reply=True)
  296. if response and 'ok' in response[0][workername]:
  297. self.write(dict(message=response[0][workername]['ok']))
  298. else:
  299. logger.error(response)
  300. self.set_status(403)
  301. self.write(
  302. "Failed to cancel '%s' consumer from '%s' worker: %s" % (
  303. workername, self.error_reason(workername, response)
  304. ))
  305. class TaskRevoke(BaseHandler):
  306. @web.authenticated
  307. def post(self, taskid):
  308. """
  309. Revoke a task
  310. **Example request**:
  311. .. sourcecode:: http
  312. POST /api/task/revoke/1480b55c-b8b2-462c-985e-24af3e9158f9?terminate=true
  313. Content-Length: 0
  314. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  315. Host: localhost:5555
  316. **Example response**:
  317. .. sourcecode:: http
  318. HTTP/1.1 200 OK
  319. Content-Length: 61
  320. Content-Type: application/json; charset=UTF-8
  321. {
  322. "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  323. }
  324. :query terminate: terminate the task if it is running
  325. :reqheader Authorization: optional OAuth token to authenticate
  326. :statuscode 200: no error
  327. :statuscode 401: unauthorized request
  328. """
  329. logger.info("Revoking task '%s'", taskid)
  330. celery = self.application.celery_app
  331. terminate = self.get_argument('terminate', default=False, type=bool)
  332. celery.control.revoke(taskid, terminate=terminate)
  333. self.write(dict(message="Revoked '%s'" % taskid))
  334. class TaskTimout(ControlHandler):
  335. @web.authenticated
  336. def post(self, taskname):
  337. """
  338. Change soft and hard time limits for a task
  339. **Example request**:
  340. .. sourcecode:: http
  341. POST /api/task/timeout/tasks.sleep HTTP/1.1
  342. Content-Length: 44
  343. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  344. Host: localhost:5555
  345. soft=30&hard=100&workername=celery%40worker1
  346. **Example response**:
  347. .. sourcecode:: http
  348. HTTP/1.1 200 OK
  349. Content-Length: 46
  350. Content-Type: application/json; charset=UTF-8
  351. {
  352. "message": "new rate limit set successfully"
  353. }
  354. :query workername: worker name
  355. :reqheader Authorization: optional OAuth token to authenticate
  356. :statuscode 200: no error
  357. :statuscode 401: unauthorized request
  358. :statuscode 404: unknown task/worker
  359. """
  360. celery = self.application.celery_app
  361. workername = self.get_argument('workername')
  362. hard = self.get_argument('hard', default=None, type=float)
  363. soft = self.get_argument('soft', default=None, type=float)
  364. if taskname not in celery.tasks:
  365. raise web.HTTPError(404, "Unknown task '%s'" % taskname)
  366. if workername is not None and not self.is_worker(workername):
  367. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  368. logger.info("Setting timeouts for '%s' task (%s, %s)",
  369. taskname, soft, hard)
  370. destination = [workername] if workername is not None else None
  371. response = celery.control.time_limit(taskname, reply=True,
  372. hard=hard, soft=soft,
  373. destination=destination)
  374. if response and 'ok' in response[0][workername]:
  375. self.write(dict(message=response[0][workername]['ok']))
  376. else:
  377. logger.error(response)
  378. self.set_status(403)
  379. self.write("Failed to set timeouts: '%s'" %
  380. self.error_reason(taskname, response))
  381. class TaskRateLimit(ControlHandler):
  382. @web.authenticated
  383. def post(self, taskname):
  384. """
  385. Change rate limit for a task
  386. **Example request**:
  387. .. sourcecode:: http
  388. POST /api/task/rate-limit/tasks.sleep HTTP/1.1
  389. Content-Length: 41
  390. Content-Type: application/x-www-form-urlencoded; charset=utf-8
  391. Host: localhost:5555
  392. ratelimit=200&workername=celery%40worker1
  393. **Example response**:
  394. .. sourcecode:: http
  395. HTTP/1.1 200 OK
  396. Content-Length: 61
  397. Content-Type: application/json; charset=UTF-8
  398. {
  399. "message": "Revoked '1480b55c-b8b2-462c-985e-24af3e9158f9'"
  400. }
  401. :query terminate: terminate the task if it is running
  402. :reqheader Authorization: optional OAuth token to authenticate
  403. :statuscode 200: no error
  404. :statuscode 401: unauthorized request
  405. :statuscode 404: unknown task/worker
  406. """
  407. celery = self.application.celery_app
  408. workername = self.get_argument('workername')
  409. ratelimit = self.get_argument('ratelimit')
  410. if taskname not in celery.tasks:
  411. raise web.HTTPError(404, "Unknown task '%s'" % taskname)
  412. if workername is not None and not self.is_worker(workername):
  413. raise web.HTTPError(404, "Unknown worker '%s'" % workername)
  414. logger.info("Setting '%s' rate limit for '%s' task",
  415. ratelimit, taskname)
  416. destination = [workername] if workername is not None else None
  417. response = celery.control.rate_limit(taskname,
  418. ratelimit,
  419. reply=True,
  420. destination=destination)
  421. if response and 'ok' in response[0][workername]:
  422. self.write(dict(message=response[0][workername]['ok']))
  423. else:
  424. logger.error(response)
  425. self.set_status(403)
  426. self.write("Failed to set rate limit: '%s'" %
  427. self.error_reason(taskname, response))