test_result.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. from __future__ import absolute_import
  2. from contextlib import contextmanager
  3. from mock import Mock, patch
  4. from celery import states
  5. from celery.exceptions import IncompleteStream, TimeoutError
  6. from celery.five import range
  7. from celery.result import (
  8. AsyncResult,
  9. EagerResult,
  10. TaskSetResult,
  11. result_from_tuple,
  12. )
  13. from celery.utils import uuid
  14. from celery.utils.serialization import pickle
  15. from celery.tests.case import AppCase, depends_on_current_app
  16. def mock_task(name, state, result):
  17. return dict(id=uuid(), name=name, state=state, result=result)
  18. def save_result(app, task):
  19. traceback = 'Some traceback'
  20. if task['state'] == states.SUCCESS:
  21. app.backend.mark_as_done(task['id'], task['result'])
  22. elif task['state'] == states.RETRY:
  23. app.backend.mark_as_retry(
  24. task['id'], task['result'], traceback=traceback,
  25. )
  26. else:
  27. app.backend.mark_as_failure(
  28. task['id'], task['result'], traceback=traceback,
  29. )
  30. def make_mock_group(app, size=10):
  31. tasks = [mock_task('ts%d' % i, states.SUCCESS, i) for i in range(size)]
  32. [save_result(app, task) for task in tasks]
  33. return [app.AsyncResult(task['id']) for task in tasks]
  34. class test_AsyncResult(AppCase):
  35. def setup(self):
  36. self.task1 = mock_task('task1', states.SUCCESS, 'the')
  37. self.task2 = mock_task('task2', states.SUCCESS, 'quick')
  38. self.task3 = mock_task('task3', states.FAILURE, KeyError('brown'))
  39. self.task4 = mock_task('task3', states.RETRY, KeyError('red'))
  40. for task in (self.task1, self.task2, self.task3, self.task4):
  41. save_result(self.app, task)
  42. @self.app.task(shared=False)
  43. def mytask():
  44. pass
  45. self.mytask = mytask
  46. def test_compat_properties(self):
  47. x = self.app.AsyncResult('1')
  48. self.assertEqual(x.task_id, x.id)
  49. x.task_id = '2'
  50. self.assertEqual(x.id, '2')
  51. def test_children(self):
  52. x = self.app.AsyncResult('1')
  53. children = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  54. x.backend = Mock()
  55. x.backend.get_children.return_value = children
  56. x.backend.READY_STATES = states.READY_STATES
  57. self.assertTrue(x.children)
  58. self.assertEqual(len(x.children), 3)
  59. def test_propagates_for_parent(self):
  60. x = self.app.AsyncResult(uuid())
  61. x.backend = Mock()
  62. x.parent = EagerResult(uuid(), KeyError('foo'), states.FAILURE)
  63. with self.assertRaises(KeyError):
  64. x.get(propagate=True)
  65. self.assertFalse(x.backend.wait_for.called)
  66. x.parent = EagerResult(uuid(), 42, states.SUCCESS)
  67. x.get(propagate=True)
  68. self.assertTrue(x.backend.wait_for.called)
  69. def test_get_children(self):
  70. tid = uuid()
  71. x = self.app.AsyncResult(tid)
  72. child = [self.app.AsyncResult(uuid()).as_tuple()
  73. for i in range(10)]
  74. x.backend._cache[tid] = {'children': child}
  75. self.assertTrue(x.children)
  76. self.assertEqual(len(x.children), 10)
  77. x.backend._cache[tid] = {'result': None}
  78. self.assertIsNone(x.children)
  79. def test_build_graph_get_leaf_collect(self):
  80. x = self.app.AsyncResult('1')
  81. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  82. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  83. x.iterdeps = Mock()
  84. x.iterdeps.return_value = (
  85. (None, x),
  86. (x, c[0]),
  87. (c[0], c[1]),
  88. (c[1], c[2])
  89. )
  90. x.backend.READY_STATES = states.READY_STATES
  91. self.assertTrue(x.graph)
  92. self.assertIs(x.get_leaf(), 2)
  93. it = x.collect()
  94. self.assertListEqual(list(it), [
  95. (x, None),
  96. (c[0], 0),
  97. (c[1], 1),
  98. (c[2], 2),
  99. ])
  100. def test_iterdeps(self):
  101. x = self.app.AsyncResult('1')
  102. x.backend._cache['1'] = {'status': states.SUCCESS, 'result': None}
  103. c = [EagerResult(str(i), i, states.SUCCESS) for i in range(3)]
  104. for child in c:
  105. child.backend = Mock()
  106. child.backend.get_children.return_value = []
  107. x.backend.get_children = Mock()
  108. x.backend.get_children.return_value = c
  109. it = x.iterdeps()
  110. self.assertListEqual(list(it), [
  111. (None, x),
  112. (x, c[0]),
  113. (x, c[1]),
  114. (x, c[2]),
  115. ])
  116. x.backend._cache.pop('1')
  117. x.ready = Mock()
  118. x.ready.return_value = False
  119. with self.assertRaises(IncompleteStream):
  120. list(x.iterdeps())
  121. list(x.iterdeps(intermediate=True))
  122. def test_eq_not_implemented(self):
  123. self.assertFalse(self.app.AsyncResult('1') == object())
  124. @depends_on_current_app
  125. def test_reduce(self):
  126. a1 = self.app.AsyncResult('uuid', task_name=self.mytask.name)
  127. restored = pickle.loads(pickle.dumps(a1))
  128. self.assertEqual(restored.id, 'uuid')
  129. self.assertEqual(restored.task_name, self.mytask.name)
  130. a2 = self.app.AsyncResult('uuid')
  131. self.assertEqual(pickle.loads(pickle.dumps(a2)).id, 'uuid')
  132. def test_successful(self):
  133. ok_res = self.app.AsyncResult(self.task1['id'])
  134. nok_res = self.app.AsyncResult(self.task3['id'])
  135. nok_res2 = self.app.AsyncResult(self.task4['id'])
  136. self.assertTrue(ok_res.successful())
  137. self.assertFalse(nok_res.successful())
  138. self.assertFalse(nok_res2.successful())
  139. pending_res = self.app.AsyncResult(uuid())
  140. self.assertFalse(pending_res.successful())
  141. def test_str(self):
  142. ok_res = self.app.AsyncResult(self.task1['id'])
  143. ok2_res = self.app.AsyncResult(self.task2['id'])
  144. nok_res = self.app.AsyncResult(self.task3['id'])
  145. self.assertEqual(str(ok_res), self.task1['id'])
  146. self.assertEqual(str(ok2_res), self.task2['id'])
  147. self.assertEqual(str(nok_res), self.task3['id'])
  148. pending_id = uuid()
  149. pending_res = self.app.AsyncResult(pending_id)
  150. self.assertEqual(str(pending_res), pending_id)
  151. def test_repr(self):
  152. ok_res = self.app.AsyncResult(self.task1['id'])
  153. ok2_res = self.app.AsyncResult(self.task2['id'])
  154. nok_res = self.app.AsyncResult(self.task3['id'])
  155. self.assertEqual(repr(ok_res), '<AsyncResult: %s>' % (
  156. self.task1['id']))
  157. self.assertEqual(repr(ok2_res), '<AsyncResult: %s>' % (
  158. self.task2['id']))
  159. self.assertEqual(repr(nok_res), '<AsyncResult: %s>' % (
  160. self.task3['id']))
  161. pending_id = uuid()
  162. pending_res = self.app.AsyncResult(pending_id)
  163. self.assertEqual(repr(pending_res), '<AsyncResult: %s>' % (
  164. pending_id))
  165. def test_hash(self):
  166. self.assertEqual(hash(self.app.AsyncResult('x0w991')),
  167. hash(self.app.AsyncResult('x0w991')))
  168. self.assertNotEqual(hash(self.app.AsyncResult('x0w991')),
  169. hash(self.app.AsyncResult('x1w991')))
  170. def test_get_traceback(self):
  171. ok_res = self.app.AsyncResult(self.task1['id'])
  172. nok_res = self.app.AsyncResult(self.task3['id'])
  173. nok_res2 = self.app.AsyncResult(self.task4['id'])
  174. self.assertFalse(ok_res.traceback)
  175. self.assertTrue(nok_res.traceback)
  176. self.assertTrue(nok_res2.traceback)
  177. pending_res = self.app.AsyncResult(uuid())
  178. self.assertFalse(pending_res.traceback)
  179. def test_get(self):
  180. ok_res = self.app.AsyncResult(self.task1['id'])
  181. ok2_res = self.app.AsyncResult(self.task2['id'])
  182. nok_res = self.app.AsyncResult(self.task3['id'])
  183. nok2_res = self.app.AsyncResult(self.task4['id'])
  184. self.assertEqual(ok_res.get(), 'the')
  185. self.assertEqual(ok2_res.get(), 'quick')
  186. with self.assertRaises(KeyError):
  187. nok_res.get()
  188. self.assertTrue(nok_res.get(propagate=False))
  189. self.assertIsInstance(nok2_res.result, KeyError)
  190. self.assertEqual(ok_res.info, 'the')
  191. def test_get_timeout(self):
  192. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  193. with self.assertRaises(TimeoutError):
  194. res.get(timeout=0.001)
  195. pending_res = self.app.AsyncResult(uuid())
  196. with patch('celery.result.time') as _time:
  197. with self.assertRaises(TimeoutError):
  198. pending_res.get(timeout=0.001, interval=0.001)
  199. _time.sleep.assert_called_with(0.001)
  200. def test_get_timeout_longer(self):
  201. res = self.app.AsyncResult(self.task4['id']) # has RETRY state
  202. with patch('celery.result.time') as _time:
  203. with self.assertRaises(TimeoutError):
  204. res.get(timeout=1, interval=1)
  205. _time.sleep.assert_called_with(1)
  206. def test_ready(self):
  207. oks = (self.app.AsyncResult(self.task1['id']),
  208. self.app.AsyncResult(self.task2['id']),
  209. self.app.AsyncResult(self.task3['id']))
  210. self.assertTrue(all(result.ready() for result in oks))
  211. self.assertFalse(self.app.AsyncResult(self.task4['id']).ready())
  212. self.assertFalse(self.app.AsyncResult(uuid()).ready())
  213. class test_ResultSet(AppCase):
  214. def test_resultset_repr(self):
  215. self.assertTrue(repr(self.app.ResultSet(
  216. [self.app.AsyncResult(t) for t in ['1', '2', '3']])))
  217. def test_eq_other(self):
  218. self.assertFalse(self.app.ResultSet([1, 3, 3]) == 1)
  219. self.assertTrue(self.app.ResultSet([1]) == self.app.ResultSet([1]))
  220. def test_get(self):
  221. x = self.app.ResultSet([self.app.AsyncResult(t) for t in [1, 2, 3]])
  222. b = x.results[0].backend = Mock()
  223. b.supports_native_join = False
  224. x.join_native = Mock()
  225. x.join = Mock()
  226. x.get()
  227. self.assertTrue(x.join.called)
  228. b.supports_native_join = True
  229. x.get()
  230. self.assertTrue(x.join_native.called)
  231. def test_add(self):
  232. x = self.app.ResultSet([1])
  233. x.add(2)
  234. self.assertEqual(len(x), 2)
  235. x.add(2)
  236. self.assertEqual(len(x), 2)
  237. @contextmanager
  238. def dummy_copy(self):
  239. with patch('celery.result.copy') as copy:
  240. def passt(arg):
  241. return arg
  242. copy.side_effect = passt
  243. yield
  244. def test_iterate_respects_subpolling_interval(self):
  245. r1 = self.app.AsyncResult(uuid())
  246. r2 = self.app.AsyncResult(uuid())
  247. backend = r1.backend = r2.backend = Mock()
  248. backend.subpolling_interval = 10
  249. ready = r1.ready = r2.ready = Mock()
  250. def se(*args, **kwargs):
  251. ready.side_effect = KeyError()
  252. return False
  253. ready.return_value = False
  254. ready.side_effect = se
  255. x = self.app.ResultSet([r1, r2])
  256. with self.dummy_copy():
  257. with patch('celery.result.time') as _time:
  258. with self.assertRaises(KeyError):
  259. list(x.iterate())
  260. _time.sleep.assert_called_with(10)
  261. backend.subpolling_interval = 0
  262. with patch('celery.result.time') as _time:
  263. with self.assertRaises(KeyError):
  264. ready.return_value = False
  265. ready.side_effect = se
  266. list(x.iterate())
  267. self.assertFalse(_time.sleep.called)
  268. def test_times_out(self):
  269. r1 = self.app.AsyncResult(uuid)
  270. r1.ready = Mock()
  271. r1.ready.return_value = False
  272. x = self.app.ResultSet([r1])
  273. with self.dummy_copy():
  274. with patch('celery.result.time'):
  275. with self.assertRaises(TimeoutError):
  276. list(x.iterate(timeout=1))
  277. def test_add_discard(self):
  278. x = self.app.ResultSet([])
  279. x.add(self.app.AsyncResult('1'))
  280. self.assertIn(self.app.AsyncResult('1'), x.results)
  281. x.discard(self.app.AsyncResult('1'))
  282. x.discard(self.app.AsyncResult('1'))
  283. x.discard('1')
  284. self.assertNotIn(self.app.AsyncResult('1'), x.results)
  285. x.update([self.app.AsyncResult('2')])
  286. def test_clear(self):
  287. x = self.app.ResultSet([])
  288. r = x.results
  289. x.clear()
  290. self.assertIs(x.results, r)
  291. class MockAsyncResultFailure(AsyncResult):
  292. @property
  293. def result(self):
  294. return KeyError('baz')
  295. @property
  296. def state(self):
  297. return states.FAILURE
  298. def get(self, propagate=True, **kwargs):
  299. if propagate:
  300. raise self.result
  301. return self.result
  302. class MockAsyncResultSuccess(AsyncResult):
  303. forgotten = False
  304. def forget(self):
  305. self.forgotten = True
  306. @property
  307. def result(self):
  308. return 42
  309. @property
  310. def state(self):
  311. return states.SUCCESS
  312. def get(self, **kwargs):
  313. return self.result
  314. class SimpleBackend(object):
  315. ids = []
  316. def __init__(self, ids=[]):
  317. self.ids = ids
  318. def get_many(self, *args, **kwargs):
  319. return ((id, {'result': i, 'status': states.SUCCESS})
  320. for i, id in enumerate(self.ids))
  321. class test_TaskSetResult(AppCase):
  322. def setup(self):
  323. self.size = 10
  324. self.ts = TaskSetResult(uuid(), make_mock_group(self.app, self.size))
  325. def test_total(self):
  326. self.assertEqual(self.ts.total, self.size)
  327. def test_compat_properties(self):
  328. self.assertEqual(self.ts.taskset_id, self.ts.id)
  329. self.ts.taskset_id = 'foo'
  330. self.assertEqual(self.ts.taskset_id, 'foo')
  331. def test_compat_subtasks_kwarg(self):
  332. x = TaskSetResult(uuid(), subtasks=[1, 2, 3])
  333. self.assertEqual(x.results, [1, 2, 3])
  334. def test_itersubtasks(self):
  335. it = self.ts.itersubtasks()
  336. for i, t in enumerate(it):
  337. self.assertEqual(t.get(), i)
  338. class test_GroupResult(AppCase):
  339. def setup(self):
  340. self.size = 10
  341. self.ts = self.app.GroupResult(
  342. uuid(), make_mock_group(self.app, self.size),
  343. )
  344. @depends_on_current_app
  345. def test_is_pickleable(self):
  346. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  347. self.assertEqual(pickle.loads(pickle.dumps(ts)), ts)
  348. ts2 = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  349. self.assertEqual(pickle.loads(pickle.dumps(ts2)), ts2)
  350. def test_len(self):
  351. self.assertEqual(len(self.ts), self.size)
  352. def test_eq_other(self):
  353. self.assertFalse(self.ts == 1)
  354. @depends_on_current_app
  355. def test_reduce(self):
  356. self.assertTrue(pickle.loads(pickle.dumps(self.ts)))
  357. def test_iterate_raises(self):
  358. ar = MockAsyncResultFailure(uuid(), app=self.app)
  359. ts = self.app.GroupResult(uuid(), [ar])
  360. it = ts.iterate()
  361. with self.assertRaises(KeyError):
  362. next(it)
  363. def test_forget(self):
  364. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  365. MockAsyncResultSuccess(uuid(), app=self.app)]
  366. ts = self.app.GroupResult(uuid(), subs)
  367. ts.forget()
  368. for sub in subs:
  369. self.assertTrue(sub.forgotten)
  370. def test_getitem(self):
  371. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  372. MockAsyncResultSuccess(uuid(), app=self.app)]
  373. ts = self.app.GroupResult(uuid(), subs)
  374. self.assertIs(ts[0], subs[0])
  375. def test_save_restore(self):
  376. subs = [MockAsyncResultSuccess(uuid(), app=self.app),
  377. MockAsyncResultSuccess(uuid(), app=self.app)]
  378. ts = self.app.GroupResult(uuid(), subs)
  379. ts.save()
  380. with self.assertRaises(AttributeError):
  381. ts.save(backend=object())
  382. self.assertEqual(self.app.GroupResult.restore(ts.id).subtasks,
  383. ts.subtasks)
  384. ts.delete()
  385. self.assertIsNone(self.app.GroupResult.restore(ts.id))
  386. with self.assertRaises(AttributeError):
  387. self.app.GroupResult.restore(ts.id, backend=object())
  388. def test_join_native(self):
  389. backend = SimpleBackend()
  390. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  391. for i in range(10)]
  392. ts = self.app.GroupResult(uuid(), subtasks)
  393. backend.ids = [subtask.id for subtask in subtasks]
  394. res = ts.join_native()
  395. self.assertEqual(res, list(range(10)))
  396. def test_join_native_raises(self):
  397. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  398. ts.iter_native = Mock()
  399. ts.iter_native.return_value = iter([
  400. (uuid(), {'status': states.FAILURE, 'result': KeyError()})
  401. ])
  402. with self.assertRaises(KeyError):
  403. ts.join_native(propagate=True)
  404. def test_failed_join_report(self):
  405. res = Mock()
  406. ts = self.app.GroupResult(uuid(), [res])
  407. res.state = states.FAILURE
  408. res.backend.is_cached.return_value = True
  409. self.assertIs(next(ts._failed_join_report()), res)
  410. res.backend.is_cached.return_value = False
  411. with self.assertRaises(StopIteration):
  412. next(ts._failed_join_report())
  413. def test_repr(self):
  414. self.assertTrue(repr(
  415. self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  416. ))
  417. def test_children_is_results(self):
  418. ts = self.app.GroupResult(uuid(), [self.app.AsyncResult(uuid())])
  419. self.assertIs(ts.children, ts.results)
  420. def test_iter_native(self):
  421. backend = SimpleBackend()
  422. subtasks = [self.app.AsyncResult(uuid(), backend=backend)
  423. for i in range(10)]
  424. ts = self.app.GroupResult(uuid(), subtasks)
  425. backend.ids = [subtask.id for subtask in subtasks]
  426. self.assertEqual(len(list(ts.iter_native())), 10)
  427. def test_iterate_yields(self):
  428. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  429. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  430. ts = self.app.GroupResult(uuid(), [ar, ar2])
  431. it = ts.iterate()
  432. self.assertEqual(next(it), 42)
  433. self.assertEqual(next(it), 42)
  434. def test_iterate_eager(self):
  435. ar1 = EagerResult(uuid(), 42, states.SUCCESS)
  436. ar2 = EagerResult(uuid(), 42, states.SUCCESS)
  437. ts = self.app.GroupResult(uuid(), [ar1, ar2])
  438. it = ts.iterate()
  439. self.assertEqual(next(it), 42)
  440. self.assertEqual(next(it), 42)
  441. def test_join_timeout(self):
  442. ar = MockAsyncResultSuccess(uuid(), app=self.app)
  443. ar2 = MockAsyncResultSuccess(uuid(), app=self.app)
  444. ar3 = self.app.AsyncResult(uuid())
  445. ts = self.app.GroupResult(uuid(), [ar, ar2, ar3])
  446. with self.assertRaises(TimeoutError):
  447. ts.join(timeout=0.0000001)
  448. ar4 = self.app.AsyncResult(uuid())
  449. ar4.get = Mock()
  450. ts2 = self.app.GroupResult(uuid(), [ar4])
  451. self.assertTrue(ts2.join(timeout=0.1))
  452. def test_iter_native_when_empty_group(self):
  453. ts = self.app.GroupResult(uuid(), [])
  454. self.assertListEqual(list(ts.iter_native()), [])
  455. def test_iterate_simple(self):
  456. it = self.ts.iterate()
  457. results = sorted(list(it))
  458. self.assertListEqual(results, list(range(self.size)))
  459. def test___iter__(self):
  460. self.assertListEqual(list(iter(self.ts)), self.ts.results)
  461. def test_join(self):
  462. joined = self.ts.join()
  463. self.assertListEqual(joined, list(range(self.size)))
  464. def test_successful(self):
  465. self.assertTrue(self.ts.successful())
  466. def test_failed(self):
  467. self.assertFalse(self.ts.failed())
  468. def test_waiting(self):
  469. self.assertFalse(self.ts.waiting())
  470. def test_ready(self):
  471. self.assertTrue(self.ts.ready())
  472. def test_completed_count(self):
  473. self.assertEqual(self.ts.completed_count(), len(self.ts))
  474. class test_pending_AsyncResult(AppCase):
  475. def setup(self):
  476. self.task = self.app.AsyncResult(uuid())
  477. def test_result(self):
  478. self.assertIsNone(self.task.result)
  479. class test_failed_AsyncResult(test_GroupResult):
  480. def setup(self):
  481. self.size = 11
  482. subtasks = make_mock_group(self.app, 10)
  483. failed = mock_task('ts11', states.FAILURE, KeyError('Baz'))
  484. save_result(self.app, failed)
  485. failed_res = self.app.AsyncResult(failed['id'])
  486. self.ts = self.app.GroupResult(uuid(), subtasks + [failed_res])
  487. def test_completed_count(self):
  488. self.assertEqual(self.ts.completed_count(), len(self.ts) - 1)
  489. def test_iterate_simple(self):
  490. it = self.ts.iterate()
  491. def consume():
  492. return list(it)
  493. with self.assertRaises(KeyError):
  494. consume()
  495. def test_join(self):
  496. with self.assertRaises(KeyError):
  497. self.ts.join()
  498. def test_successful(self):
  499. self.assertFalse(self.ts.successful())
  500. def test_failed(self):
  501. self.assertTrue(self.ts.failed())
  502. class test_pending_Group(AppCase):
  503. def setup(self):
  504. self.ts = self.app.GroupResult(
  505. uuid(), [self.app.AsyncResult(uuid()),
  506. self.app.AsyncResult(uuid())])
  507. def test_completed_count(self):
  508. self.assertEqual(self.ts.completed_count(), 0)
  509. def test_ready(self):
  510. self.assertFalse(self.ts.ready())
  511. def test_waiting(self):
  512. self.assertTrue(self.ts.waiting())
  513. def x_join(self):
  514. with self.assertRaises(TimeoutError):
  515. self.ts.join(timeout=0.001)
  516. def x_join_longer(self):
  517. with self.assertRaises(TimeoutError):
  518. self.ts.join(timeout=1)
  519. class test_EagerResult(AppCase):
  520. def setup(self):
  521. @self.app.task(shared=False)
  522. def raising(x, y):
  523. raise KeyError(x, y)
  524. self.raising = raising
  525. def test_wait_raises(self):
  526. res = self.raising.apply(args=[3, 3])
  527. with self.assertRaises(KeyError):
  528. res.wait()
  529. self.assertTrue(res.wait(propagate=False))
  530. def test_wait(self):
  531. res = EagerResult('x', 'x', states.RETRY)
  532. res.wait()
  533. self.assertEqual(res.state, states.RETRY)
  534. self.assertEqual(res.status, states.RETRY)
  535. def test_forget(self):
  536. res = EagerResult('x', 'x', states.RETRY)
  537. res.forget()
  538. def test_revoke(self):
  539. res = self.raising.apply(args=[3, 3])
  540. self.assertFalse(res.revoke())
  541. class test_tuples(AppCase):
  542. def test_AsyncResult(self):
  543. x = self.app.AsyncResult(uuid())
  544. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  545. self.assertEqual(x, result_from_tuple(x, self.app))
  546. def test_with_parent(self):
  547. x = self.app.AsyncResult(uuid())
  548. x.parent = self.app.AsyncResult(uuid())
  549. y = result_from_tuple(x.as_tuple(), self.app)
  550. self.assertEqual(y, x)
  551. self.assertEqual(y.parent, x.parent)
  552. self.assertIsInstance(y.parent, AsyncResult)
  553. def test_compat(self):
  554. uid = uuid()
  555. x = result_from_tuple([uid, []], app=self.app)
  556. self.assertEqual(x.id, uid)
  557. def test_GroupResult(self):
  558. x = self.app.GroupResult(
  559. uuid(), [self.app.AsyncResult(uuid()) for _ in range(10)],
  560. )
  561. self.assertEqual(x, result_from_tuple(x.as_tuple(), self.app))
  562. self.assertEqual(x, result_from_tuple(x, self.app))