tasks.py 10.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. from __future__ import absolute_import
  2. import json
  3. import logging
  4. from tornado import web
  5. from tornado.escape import json_decode
  6. from tornado.web import HTTPError
  7. from celery import states
  8. from celery.result import AsyncResult
  9. from celery.backends.base import DisabledBackend
  10. from ..models import TaskModel
  11. from ..views import BaseHandler
  12. logger = logging.getLogger(__name__)
  13. class BaseTaskHandler(BaseHandler):
  14. def get_task_args(self):
  15. try:
  16. body = self.request.body
  17. options = json_decode(body) if body else {}
  18. except ValueError as e:
  19. raise HTTPError(400, str(e))
  20. args = options.pop('args', [])
  21. kwargs = options.pop('kwargs', {})
  22. if not isinstance(args, (list, tuple)):
  23. raise HTTPError(400, 'args must be an array')
  24. return args, kwargs, options
  25. @staticmethod
  26. def backend_configured(result):
  27. return not isinstance(result.backend, DisabledBackend)
  28. def write_error(self, status_code, **kwargs):
  29. self.set_status(status_code)
  30. def safe_result(self, result):
  31. "returns json encodable result"
  32. try:
  33. json.dumps(result)
  34. except TypeError:
  35. return repr(result)
  36. else:
  37. return result
  38. class TaskAsyncApply(BaseTaskHandler):
  39. @web.authenticated
  40. def post(self, taskname):
  41. """
  42. Execute a task
  43. **Example request**:
  44. .. sourcecode:: http
  45. POST /api/task/async-apply/tasks.add HTTP/1.1
  46. Accept: application/json
  47. Accept-Encoding: gzip, deflate, compress
  48. Content-Length: 16
  49. Content-Type: application/json; charset=utf-8
  50. Host: localhost:5555
  51. {
  52. "args": [1, 2]
  53. }
  54. **Example response**:
  55. .. sourcecode:: http
  56. HTTP/1.1 200 OK
  57. Content-Length: 71
  58. Content-Type: application/json; charset=UTF-8
  59. Date: Sun, 13 Apr 2014 15:55:00 GMT
  60. {
  61. "state": "PENDING",
  62. "task-id": "abc300c7-2922-4069-97b6-a635cc2ac47c"
  63. }
  64. :query args: a list of arguments
  65. :query kwargs: a dictionary of arguments
  66. :reqheader Authorization: optional OAuth token to authenticate
  67. :statuscode 200: no error
  68. :statuscode 401: unauthorized request
  69. :statuscode 404: unknown task
  70. """
  71. celery = self.application.celery_app
  72. args, kwargs, options = self.get_task_args()
  73. logger.info("Invoking a task '%s' with '%s' and '%s'",
  74. taskname, args, kwargs)
  75. try:
  76. task = celery.tasks[taskname]
  77. except KeyError:
  78. raise HTTPError(404, "Unknown task '%s'" % taskname)
  79. result = task.apply_async(args=args, kwargs=kwargs, **options)
  80. response = {'task-id': result.task_id}
  81. if self.backend_configured(result):
  82. response.update(state=result.state)
  83. self.write(response)
  84. class TaskSend(BaseTaskHandler):
  85. @web.authenticated
  86. def post(self, taskname):
  87. """
  88. Execute a task by name (doesn't require task sources)
  89. **Example request**:
  90. .. sourcecode:: http
  91. POST /api/task/send-task/tasks.add HTTP/1.1
  92. Accept: application/json
  93. Accept-Encoding: gzip, deflate, compress
  94. Content-Length: 16
  95. Content-Type: application/json; charset=utf-8
  96. Host: localhost:5555
  97. {
  98. "args": [1, 2]
  99. }
  100. **Example response**:
  101. .. sourcecode:: http
  102. HTTP/1.1 200 OK
  103. Content-Length: 71
  104. Content-Type: application/json; charset=UTF-8
  105. {
  106. "state": "SUCCESS",
  107. "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  108. }
  109. :query args: a list of arguments
  110. :query kwargs: a dictionary of arguments
  111. :reqheader Authorization: optional OAuth token to authenticate
  112. :statuscode 200: no error
  113. :statuscode 401: unauthorized request
  114. :statuscode 404: unknown task
  115. """
  116. celery = self.application.celery_app
  117. args, kwargs, options = self.get_task_args()
  118. logger.debug("Invoking task '%s' with '%s' and '%s'",
  119. taskname, args, kwargs)
  120. result = celery.send_task(taskname, args=args,
  121. kwargs=kwargs, **options)
  122. response = {'task-id': result.task_id}
  123. if self.backend_configured(result):
  124. response.update(state=result.state)
  125. self.write(response)
  126. class TaskResult(BaseTaskHandler):
  127. @web.authenticated
  128. def get(self, taskid):
  129. """
  130. Get a task result
  131. **Example request**:
  132. .. sourcecode:: http
  133. GET /api/task/result/c60be250-fe52-48df-befb-ac66174076e6 HTTP/1.1
  134. Host: localhost:5555
  135. **Example response**:
  136. .. sourcecode:: http
  137. HTTP/1.1 200 OK
  138. Content-Length: 84
  139. Content-Type: application/json; charset=UTF-8
  140. {
  141. "result": 3,
  142. "state": "SUCCESS",
  143. "task-id": "c60be250-fe52-48df-befb-ac66174076e6"
  144. }
  145. :reqheader Authorization: optional OAuth token to authenticate
  146. :statuscode 200: no error
  147. :statuscode 401: unauthorized request
  148. :statuscode 503: result backend is not configured
  149. """
  150. result = AsyncResult(taskid)
  151. if not self.backend_configured(result):
  152. raise HTTPError(503)
  153. response = {'task-id': taskid, 'state': result.state}
  154. if result.ready():
  155. if result.state == states.FAILURE:
  156. response.update({'result': self.safe_result(result.result),
  157. 'traceback': result.traceback})
  158. else:
  159. response.update({'result': self.safe_result(result.result)})
  160. self.write(response)
  161. class ListTasks(BaseTaskHandler):
  162. @web.authenticated
  163. def get(self):
  164. """
  165. List tasks
  166. **Example request**:
  167. .. sourcecode:: http
  168. GET /api/tasks HTTP/1.1
  169. Host: localhost:5555
  170. User-Agent: HTTPie/0.8.0
  171. **Example response**:
  172. .. sourcecode:: http
  173. HTTP/1.1 200 OK
  174. Content-Length: 1109
  175. Content-Type: application/json; charset=UTF-8
  176. Etag: "b2478118015c8b825f7b88ce6b660e5449746c37"
  177. Server: TornadoServer/3.1.1
  178. {
  179. "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9": {
  180. "args": "[3, 4]",
  181. "client": null,
  182. "clock": 1079,
  183. "eta": null,
  184. "exception": null,
  185. "exchange": null,
  186. "expires": null,
  187. "failed": null,
  188. "kwargs": "{}",
  189. "name": "tasks.add",
  190. "received": 1398505411.107885,
  191. "result": "'7'",
  192. "retried": null,
  193. "retries": 0,
  194. "revoked": null,
  195. "routing_key": null,
  196. "runtime": 0.01610181899741292,
  197. "sent": null,
  198. "started": 1398505411.108985,
  199. "state": "SUCCESS",
  200. "succeeded": 1398505411.124802,
  201. "timestamp": 1398505411.124802,
  202. "traceback": null,
  203. "uuid": "e42ceb2d-8730-47b5-8b4d-8e0d2a1ef7c9"
  204. },
  205. "f67ea225-ae9e-42a8-90b0-5de0b24507e0": {
  206. "args": "[1, 2]",
  207. "client": null,
  208. "clock": 1042,
  209. "eta": null,
  210. "exception": null,
  211. "exchange": null,
  212. "expires": null,
  213. "failed": null,
  214. "kwargs": "{}",
  215. "name": "tasks.add",
  216. "received": 1398505395.327208,
  217. "result": "'3'",
  218. "retried": null,
  219. "retries": 0,
  220. "revoked": null,
  221. "routing_key": null,
  222. "runtime": 0.012884548006695695,
  223. "sent": null,
  224. "started": 1398505395.3289,
  225. "state": "SUCCESS",
  226. "succeeded": 1398505395.341089,
  227. "timestamp": 1398505395.341089,
  228. "traceback": null,
  229. "uuid": "f67ea225-ae9e-42a8-90b0-5de0b24507e0"
  230. }
  231. }
  232. :query limit: maximum number of tasks
  233. :query workername: filter task by workername
  234. :query taskname: filter tasks by taskname
  235. :query state: filter tasks by state
  236. :reqheader Authorization: optional OAuth token to authenticate
  237. :statuscode 200: no error
  238. :statuscode 401: unauthorized request
  239. """
  240. app = self.application
  241. limit = self.get_argument('limit', None)
  242. worker = self.get_argument('workername', None)
  243. type = self.get_argument('taskname', None)
  244. state = self.get_argument('state', None)
  245. limit = limit and int(limit)
  246. worker = worker if worker != 'All' else None
  247. type = type if type != 'All' else None
  248. state = state if state != 'All' else None
  249. tasks = []
  250. for task_id, task in TaskModel.iter_tasks(
  251. app, limit=limit, type=type,
  252. worker=worker, state=state):
  253. task = task.as_dict()
  254. task.pop('worker')
  255. tasks.append((task_id, task))
  256. self.write(dict(tasks))
  257. class TaskInfo(BaseTaskHandler):
  258. def get(self, taskid):
  259. """
  260. Get a task info
  261. **Example request**:
  262. .. sourcecode:: http
  263. GET /api/task/info/91396550-c228-4111-9da4-9d88cfd5ddc6 HTTP/1.1
  264. Accept: */*
  265. Accept-Encoding: gzip, deflate, compress
  266. Host: localhost:5555
  267. **Example response**:
  268. .. sourcecode:: http
  269. HTTP/1.1 200 OK
  270. Content-Length: 575
  271. Content-Type: application/json; charset=UTF-8
  272. {
  273. "args": "[2, 2]",
  274. "client": null,
  275. "clock": 25,
  276. "eta": null,
  277. "exception": null,
  278. "exchange": null,
  279. "expires": null,
  280. "failed": null,
  281. "kwargs": "{}",
  282. "name": "tasks.add",
  283. "received": 1400806241.970742,
  284. "result": "'{\"result\": 4}'",
  285. "retried": null,
  286. "retries": null,
  287. "revoked": null,
  288. "routing_key": null,
  289. "runtime": 2.0037889280356467,
  290. "sent": null,
  291. "started": 1400806241.972624,
  292. "state": "SUCCESS",
  293. "succeeded": 1400806243.975336,
  294. "task-id": "91396550-c228-4111-9da4-9d88cfd5ddc6",
  295. "timestamp": 1400806243.975336,
  296. "traceback": null,
  297. "worker": "celery@worker1"
  298. }
  299. :reqheader Authorization: optional OAuth token to authenticate
  300. :statuscode 200: no error
  301. :statuscode 401: unauthorized request
  302. :statuscode 404: unknown task
  303. """
  304. task = TaskModel.get_task_by_id(self.application, taskid)
  305. if not task:
  306. raise HTTPError(404, "Unknown task '%s'" % taskid)
  307. response = {}
  308. for name in task._fields:
  309. if name not in ['uuid', 'worker']:
  310. response[name] = getattr(task, name, None)
  311. response['task-id'] = task.uuid
  312. response['worker'] = task.worker.hostname
  313. self.write(response)