test_queue.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687
  1. import datetime
  2. from huey import crontab
  3. from huey import exceptions as huey_exceptions
  4. from huey import RedisHuey
  5. from huey.api import Huey
  6. from huey.api import QueueTask
  7. from huey.api import TaskWrapper
  8. from huey.constants import EmptyData
  9. from huey.exceptions import TaskException
  10. from huey.registry import registry
  11. from huey.storage import RedisStorage
  12. from huey.tests.base import b
  13. from huey.tests.base import BaseTestCase
  14. from huey.utils import local_to_utc
  15. huey = RedisHuey(result_store=False, events=False, blocking=False)
  16. huey_results = RedisHuey(blocking=False, max_errors=10)
  17. huey_store_none = RedisHuey(store_none=True, blocking=False)
  18. # Global state.
  19. state = {}
  20. @huey.task()
  21. def put_data(key, value):
  22. state[key] = value
  23. @huey.task(include_task=True)
  24. def put_data_ctx(key, value, task=None):
  25. state['last_task_class'] = type(task).__name__
  26. @huey_results.task(include_task=True)
  27. def error_testing_task_with_ctx(key, value, task=None):
  28. bad = 1/0
  29. state['last_task_class'] = type(task).__name__
  30. class PutTask(QueueTask):
  31. def execute(self):
  32. k, v = self.data
  33. state[k] = v
  34. registry.register(PutTask)
  35. class TestException(Exception):
  36. pass
  37. def _throw_error_task(message=None):
  38. raise TestException(message or 'bampf')
  39. throw_error_task = huey.task()(_throw_error_task)
  40. throw_error_task_res = huey_results.task()(_throw_error_task)
  41. @huey_results.task()
  42. def add_values(a, b):
  43. return a + b
  44. @huey_results.task()
  45. def add_values2(a, b):
  46. return a + b
  47. @huey_results.periodic_task(crontab(minute='0'))
  48. def hourly_task2():
  49. state['periodic'] = 2
  50. @huey_results.task()
  51. def returns_none():
  52. return None
  53. @huey_store_none.task()
  54. def returns_none2():
  55. return None
  56. class BaseQueueTestCase(BaseTestCase):
  57. def setUp(self):
  58. global state
  59. state = {}
  60. huey.flush()
  61. huey_results.flush()
  62. huey_store_none.flush()
  63. self.assertEqual(len(huey), 0)
  64. def tearDown(self):
  65. huey.flush()
  66. huey_results.flush()
  67. huey_store_none.flush()
  68. class TestHueyQueueMetadataAPIs(BaseQueueTestCase):
  69. def test_queue_metadata(self):
  70. put_data('k1', 'v1')
  71. put_data('k2', 'v2')
  72. cmd2, cmd1 = huey.pending()
  73. self.assertEqual(cmd2.data, (('k2', 'v2'), {}))
  74. self.assertEqual(cmd1.data, (('k1', 'v1'), {}))
  75. huey.dequeue()
  76. cmd1, = huey.pending()
  77. self.assertEqual(cmd1.data, (('k2', 'v2'), {}))
  78. def test_schedule_metadata(self):
  79. add_values.schedule((1, 2), delay=10)
  80. add_values.schedule((3, 4), delay=5)
  81. self.assertEqual(len(huey_results), 2)
  82. huey_results.add_schedule(huey.dequeue())
  83. huey_results.add_schedule(huey.dequeue())
  84. cmd2, cmd1 = huey_results.scheduled()
  85. self.assertEqual(cmd1.data, ((1, 2), {}))
  86. self.assertEqual(cmd2.data, ((3, 4), {}))
  87. def test_results_metadata(self):
  88. add_values(1, 2)
  89. add_values(3, 4)
  90. t1 = huey_results.dequeue()
  91. t2 = huey_results.dequeue()
  92. self.assertEqual(huey_results.all_results(), {})
  93. huey_results.execute(t1)
  94. self.assertEqual(list(huey_results.all_results()), [b(t1.task_id)])
  95. huey_results.execute(t2)
  96. self.assertEqual(sorted(huey_results.all_results().keys()),
  97. sorted([b(t1.task_id), b(t2.task_id)]))
  98. class TestHueyQueueAPIs(BaseQueueTestCase):
  99. def test_enqueue(self):
  100. # initializing the command does not enqueue it
  101. task = PutTask(('k', 'v'))
  102. self.assertEqual(len(huey), 0)
  103. # ok, enqueue it, then check that it was enqueued
  104. huey.enqueue(task)
  105. self.assertEqual(len(huey), 1)
  106. self.assertEqual(state, {})
  107. # it can be enqueued multiple times
  108. huey.enqueue(task)
  109. self.assertEqual(len(huey), 2)
  110. # no changes to state
  111. self.assertEqual(state, {})
  112. def test_enqueue_decorator(self):
  113. put_data('k', 'v')
  114. self.assertEqual(len(huey), 1)
  115. put_data('k', 'v')
  116. self.assertEqual(len(huey), 2)
  117. # no changes to state
  118. self.assertEqual(state, {})
  119. def test_scheduled_time(self):
  120. put_data('k', 'v')
  121. task = huey.dequeue()
  122. self.assertEqual(len(huey), 0)
  123. self.assertEqual(task.execute_time, None)
  124. dt = datetime.datetime(2011, 1, 1, 0, 1)
  125. put_data.schedule(args=('k2', 'v2'), eta=dt)
  126. self.assertEqual(len(huey), 1)
  127. task = huey.dequeue()
  128. self.assertEqual(task.execute_time, local_to_utc(dt))
  129. put_data.schedule(args=('k3', 'v3'), eta=dt, convert_utc=False)
  130. self.assertEqual(len(huey), 1)
  131. task = huey.dequeue()
  132. self.assertEqual(task.execute_time, dt)
  133. def test_error_raised(self):
  134. throw_error_task()
  135. task = huey.dequeue()
  136. self.assertRaises(TestException, huey.execute, task)
  137. def test_error_logging(self):
  138. def call_task():
  139. throw_error_task_res('nuggie')
  140. task = huey_results.dequeue()
  141. self.assertRaises(TestException, huey_results.execute, task)
  142. return task
  143. hr = huey_results
  144. self.assertEqual(len(hr.errors()), 0)
  145. task = call_task()
  146. errors = hr.errors()
  147. self.assertEqual(len(errors), 1)
  148. error = errors[0]
  149. self.assertTrue(error['error'].startswith('TestException(\'nuggie\''))
  150. self.assertEqual(error['id'], task.task_id)
  151. for i in range(9):
  152. call_task()
  153. self.assertEqual(len(hr.errors()), i + 2)
  154. self.assertEqual(len(hr.errors()), 10) # Just to be clear.
  155. # When we run the task again, the queue will have been trimmed.
  156. task = call_task()
  157. self.assertEqual(len(hr.errors()), 10)
  158. # The first item in the queue is the most recently executed task.
  159. most_recent_error = hr.errors()[0]
  160. self.assertEqual(most_recent_error['id'], task.task_id)
  161. def test_internal_error(self):
  162. """
  163. Verify that exceptions are wrapped with the special "huey"
  164. exception classes.
  165. """
  166. class SpecialException(Exception):
  167. pass
  168. class BrokenStorage(RedisStorage):
  169. def enqueue(self):
  170. raise SpecialException('read error')
  171. def dequeue(self, data):
  172. raise SpecialException('write error')
  173. def pop_data(self, key):
  174. raise SpecialException('get error')
  175. def peek_data(self, key):
  176. raise SpecialException('get error')
  177. def put_data(self, key, value):
  178. raise SpecialException('put error')
  179. def add_to_schedule(self, data, ts):
  180. raise SpecialException('add error')
  181. def read_schedule(self, ts):
  182. raise SpecialException('read error')
  183. class BrokenHuey(RedisHuey):
  184. def get_storage(self, **kwargs):
  185. return BrokenStorage(self.name)
  186. task = PutTask(('foo', 'bar'))
  187. huey = BrokenHuey()
  188. self.assertRaises(
  189. huey_exceptions.QueueWriteException,
  190. huey.enqueue,
  191. task)
  192. self.assertRaises(
  193. huey_exceptions.QueueReadException,
  194. huey.dequeue)
  195. self.assertRaises(
  196. huey_exceptions.DataStorePutException,
  197. huey.revoke,
  198. task)
  199. self.assertRaises(
  200. huey_exceptions.DataStoreGetException,
  201. huey.restore,
  202. task)
  203. self.assertRaises(
  204. huey_exceptions.ScheduleAddException,
  205. huey.add_schedule,
  206. task)
  207. self.assertRaises(
  208. huey_exceptions.ScheduleReadException,
  209. huey.read_schedule,
  210. 1)
  211. def test_dequeueing(self):
  212. res = huey.dequeue() # no error raised if queue is empty
  213. self.assertEqual(res, None)
  214. put_data('k', 'v')
  215. task = huey.dequeue()
  216. self.assertTrue(isinstance(task, QueueTask))
  217. self.assertEqual(task.get_data(), (('k', 'v'), {}))
  218. def test_execution(self):
  219. self.assertEqual(state, {})
  220. put_data('k', 'v')
  221. task = huey.dequeue()
  222. self.assertFalse('k' in state)
  223. huey.execute(task)
  224. self.assertEqual(state, {'k': 'v'})
  225. put_data('k', 'X')
  226. self.assertEqual(state, {'k': 'v'})
  227. huey.execute(huey.dequeue())
  228. self.assertEqual(state, {'k': 'X'})
  229. self.assertRaises(TypeError, huey.execute, huey.dequeue())
  230. def test_self_awareness(self):
  231. put_data_ctx('k', 'v')
  232. task = huey.dequeue()
  233. huey.execute(task)
  234. self.assertEqual(state['last_task_class'], 'queue_task_put_data_ctx')
  235. del state['last_task_class']
  236. put_data('k', 'x')
  237. huey.execute(huey.dequeue())
  238. self.assertFalse('last_task_class' in state)
  239. def test_self_aware_error_handler(self):
  240. error_testing_task_with_ctx('k', 'v')
  241. task = huey_results.dequeue()
  242. self.assertRaises(ZeroDivisionError, huey_results.execute, task)
  243. def test_call_local(self):
  244. self.assertEqual(len(huey), 0)
  245. self.assertEqual(state, {})
  246. put_data.call_local('nugget', 'green')
  247. self.assertEqual(len(huey), 0)
  248. self.assertEqual(state, {'nugget': 'green'})
  249. def test_reschedule(self):
  250. eta = datetime.datetime.utcnow() + datetime.timedelta(seconds=60)
  251. trw = add_values.schedule((1, 2), eta=eta, convert_utc=False)
  252. self.assertEqual(trw.task.execute_time, eta)
  253. # Pull pending task off queue. Quick sanity check that the task result
  254. # wrapper has the same task_id as the task we just pulled down.
  255. task = huey_results.dequeue()
  256. self.assertEqual(trw.task.task_id, task.task_id)
  257. self.assertEqual(trw.task.execute_time, task.execute_time)
  258. # Verify the task is not ready to run and add to schedule.
  259. self.assertFalse(huey_results.ready_to_run(task))
  260. huey_results.add_schedule(task)
  261. # Reschedule the task using the result wrapper.
  262. new_eta = eta - datetime.timedelta(seconds=30)
  263. trw_r = trw.reschedule(eta=new_eta, convert_utc=False)
  264. self.assertEqual(trw_r.task.execute_time, new_eta)
  265. task_r = huey_results.dequeue()
  266. self.assertEqual(task_r.execute_time, new_eta)
  267. self.assertFalse(huey_results.ready_to_run(task_r))
  268. huey_results.add_schedule(task_r)
  269. self.assertTrue(huey_results.is_revoked(task))
  270. self.assertFalse(huey_results.is_revoked(task_r))
  271. # Reschedule without an ETA.
  272. trw_r2 = trw_r.reschedule()
  273. task_r2 = huey_results.dequeue()
  274. self.assertTrue(task_r2.execute_time is None)
  275. self.assertTrue(huey_results.ready_to_run(task_r2))
  276. self.assertTrue(huey_results.is_revoked(task_r))
  277. def test_revoke(self):
  278. ac = PutTask(('k', 'v'))
  279. ac2 = PutTask(('k2', 'v2'))
  280. ac3 = PutTask(('k3', 'v3'))
  281. huey_results.enqueue(ac)
  282. huey_results.enqueue(ac2)
  283. huey_results.enqueue(ac3)
  284. huey_results.enqueue(ac2)
  285. huey_results.enqueue(ac)
  286. self.assertEqual(len(huey_results), 5)
  287. huey_results.revoke(ac2)
  288. while huey_results:
  289. task = huey_results.dequeue()
  290. if not huey_results.is_revoked(task):
  291. huey_results.execute(task)
  292. self.assertEqual(state, {'k': 'v', 'k3': 'v3'})
  293. def test_revoke_all(self):
  294. r1 = add_values(1, 2)
  295. r2 = add_values(2, 3)
  296. r3 = add_values(3, 4)
  297. r4 = add_values2(4, 5)
  298. add_values.revoke()
  299. self.assertFalse(r2.restore()) # No effect, task itself is revoked.
  300. self.assertTrue(add_values.is_revoked())
  301. for task_result in (r1, r2, r3):
  302. self.assertTrue(task_result.is_revoked())
  303. self.assertFalse(r4.is_revoked())
  304. self.assertEqual(len(huey_results), 4)
  305. results = []
  306. while huey_results:
  307. task = huey_results.dequeue()
  308. if not huey_results.is_revoked(task):
  309. results.append(task.execute())
  310. self.assertEqual(results, [9])
  311. add_values.restore()
  312. rr_1 = add_values(5, 6)
  313. rr_2 = add_values(6, 7)
  314. for task_result in (rr_1, rr_2):
  315. self.assertFalse(task_result.is_revoked())
  316. while huey_results:
  317. task = huey_results.dequeue()
  318. if not huey_results.is_revoked(task):
  319. results.append(task.execute())
  320. self.assertEqual(results, [9, 11, 13])
  321. def test_revoke_restore_by_id(self):
  322. t1 = PutTask(('k1', 'v1'))
  323. t2 = PutTask(('k2', 'v2'))
  324. t3 = PutTask(('k3', 'v3'))
  325. for task in (t1, t2, t3):
  326. huey_results.enqueue(task)
  327. huey_results.revoke_by_id(t3.task_id)
  328. huey_results.revoke_by_id(t2.task_id)
  329. huey_results.restore_by_id(t3.task_id)
  330. self.assertFalse(huey_results.is_revoked(huey_results.dequeue()))
  331. self.assertTrue(huey_results.is_revoked(huey_results.dequeue()))
  332. self.assertFalse(huey_results.is_revoked(huey_results.dequeue()))
  333. def test_revoke_periodic(self):
  334. hourly_task2.revoke()
  335. self.assertTrue(hourly_task2.is_revoked())
  336. # it is still revoked
  337. self.assertTrue(hourly_task2.is_revoked())
  338. self.assertTrue(hourly_task2.restore())
  339. self.assertFalse(hourly_task2.is_revoked())
  340. self.assertFalse(hourly_task2.restore()) # It is not revoked.
  341. hourly_task2.revoke(revoke_once=True)
  342. self.assertTrue(hourly_task2.is_revoked()) # it is revoked once, but we are preserving that state
  343. self.assertTrue(hourly_task2.is_revoked(peek=False)) # is revoked once, but clear state
  344. self.assertFalse(hourly_task2.is_revoked()) # no longer revoked
  345. d = datetime.datetime
  346. hourly_task2.revoke(revoke_until=d(2011, 1, 1, 11, 0))
  347. self.assertTrue(hourly_task2.is_revoked(dt=d(2011, 1, 1, 10, 0)))
  348. self.assertTrue(hourly_task2.is_revoked(dt=d(2011, 1, 1, 10, 59)))
  349. self.assertFalse(hourly_task2.is_revoked(dt=d(2011, 1, 1, 11, 0)))
  350. hourly_task2.restore()
  351. self.assertFalse(hourly_task2.is_revoked())
  352. def test_result_store(self):
  353. res = add_values(1, 2)
  354. res2 = add_values(4, 5)
  355. res3 = add_values(0, 0)
  356. # none have been executed as yet
  357. self.assertEqual(res.get(), None)
  358. self.assertEqual(res2.get(), None)
  359. self.assertEqual(res3.get(), None)
  360. # execute the first task
  361. huey_results.execute(huey_results.dequeue())
  362. self.assertEqual(res.get(), 3)
  363. self.assertEqual(res2.get(), None)
  364. self.assertEqual(res3.get(), None)
  365. # We can also call the result object.
  366. self.assertEqual(res(), 3)
  367. self.assertEqual(res2(), None)
  368. # execute the second task
  369. huey_results.execute(huey_results.dequeue())
  370. self.assertEqual(res.get(), 3)
  371. self.assertEqual(res2.get(), 9)
  372. self.assertEqual(res3.get(), None)
  373. # execute the 3rd, which returns a zero value
  374. huey_results.execute(huey_results.dequeue())
  375. self.assertEqual(res.get(), 3)
  376. self.assertEqual(res2.get(), 9)
  377. self.assertEqual(res3.get(), 0)
  378. # check that it returns None when nothing is present
  379. res = returns_none()
  380. self.assertEqual(res.get(), None)
  381. # execute, it will still return None, but underneath it is an EmptyResult
  382. # indicating its actual result was not persisted
  383. huey_results.execute(huey_results.dequeue())
  384. self.assertEqual(res.get(), None)
  385. self.assertEqual(res._result, EmptyData)
  386. # execute again, this time note that we're pointing at the invoker
  387. # that *does* accept None as a store-able result
  388. res = returns_none2()
  389. self.assertEqual(res.get(), None)
  390. # it stores None
  391. huey_store_none.execute(huey_store_none.dequeue())
  392. self.assertEqual(res.get(), None)
  393. self.assertEqual(res._result, None)
  394. def test_huey_result_method(self):
  395. res = add_values(1, 2)
  396. tid = res.task.task_id
  397. res2 = add_values(0, 0)
  398. tid2 = res2.task.task_id
  399. self.assertTrue(huey_results.result(tid) is None)
  400. self.assertTrue(huey_results.result(tid2) is None)
  401. # Execute the first task
  402. huey_results.execute(huey_results.dequeue())
  403. self.assertEqual(huey_results.result(tid), 3)
  404. self.assertTrue(huey_results.result(tid2) is None)
  405. # Execute the second task, which returns a zero value.
  406. huey_results.execute(huey_results.dequeue())
  407. self.assertEqual(huey_results.result(tid2), 0)
  408. def test_huey_result_error_propagation(self):
  409. # Execute a task that raises a TestException error.
  410. res = throw_error_task_res()
  411. task = huey_results.dequeue()
  412. self.assertRaises(TestException, huey_results.execute, task)
  413. # Verify TaskException raised when resolving TaskResultWrapper.
  414. self.assertRaises(TaskException, res.get)
  415. # Execute task again to verify the huey.result() API behavior.
  416. res = throw_error_task_res()
  417. tid = res.task.task_id
  418. task = huey_results.dequeue()
  419. self.assertRaises(TestException, huey_results.execute, task)
  420. # Verify error raised when calling .result() with task ID.
  421. self.assertRaises(TaskException, huey.result, tid)
  422. def test_result_preserve(self):
  423. res = add_values(1, 2)
  424. tid = res.task.task_id
  425. huey_results.execute(huey_results.dequeue())
  426. self.assertEqual(res.get(preserve=True), 3)
  427. self.assertEqual(huey_results.result(tid, preserve=True), 3)
  428. self.assertEqual(huey_results.result(tid, preserve=False), 3)
  429. self.assertEqual(huey_results.result(tid), None)
  430. def test_task_store(self):
  431. dt1 = datetime.datetime(2011, 1, 1, 0, 0)
  432. dt2 = datetime.datetime(2035, 1, 1, 0, 0)
  433. add_values.schedule(args=('k', 'v'), eta=dt1, convert_utc=False)
  434. task1 = huey_results.dequeue()
  435. add_values.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False)
  436. task2 = huey_results.dequeue()
  437. add_values('k3', 'v3')
  438. task3 = huey_results.dequeue()
  439. # add the command to the schedule
  440. huey_results.add_schedule(task1)
  441. self.assertEqual(huey_results.scheduled_count(), 1)
  442. # add a future-dated command
  443. huey_results.add_schedule(task2)
  444. self.assertEqual(huey_results.scheduled_count(), 2)
  445. huey_results.add_schedule(task3)
  446. tasks = huey_results.read_schedule(dt1)
  447. self.assertEqual(tasks, [task3, task1])
  448. tasks = huey_results.read_schedule(dt1)
  449. self.assertEqual(tasks, [])
  450. tasks = huey_results.read_schedule(dt2)
  451. self.assertEqual(tasks, [task2])
  452. def test_ready_to_run_method(self):
  453. dt1 = datetime.datetime(2011, 1, 1, 0, 0)
  454. dt2 = datetime.datetime(2035, 1, 1, 0, 0)
  455. add_values.schedule(args=('k', 'v'), eta=dt1)
  456. task1 = huey_results.dequeue()
  457. add_values.schedule(args=('k2', 'v2'), eta=dt2)
  458. task2 = huey_results.dequeue()
  459. add_values('k3', 'v3')
  460. task3 = huey_results.dequeue()
  461. add_values.schedule(args=('k4', 'v4'), task_id='test_task_id')
  462. task4 = huey_results.dequeue()
  463. # sanity check what should be run
  464. self.assertTrue(huey_results.ready_to_run(task1))
  465. self.assertFalse(huey_results.ready_to_run(task2))
  466. self.assertTrue(huey_results.ready_to_run(task3))
  467. self.assertTrue(huey_results.ready_to_run(task4))
  468. self.assertEqual('test_task_id', task4.task_id)
  469. def test_task_delay(self):
  470. curr = datetime.datetime.utcnow()
  471. curr50 = curr + datetime.timedelta(seconds=50)
  472. curr70 = curr + datetime.timedelta(seconds=70)
  473. add_values.schedule(args=('k', 'v'), delay=60)
  474. task1 = huey_results.dequeue()
  475. add_values.schedule(args=('k2', 'v2'), delay=600)
  476. task2 = huey_results.dequeue()
  477. add_values('k3', 'v3')
  478. task3 = huey_results.dequeue()
  479. # add the command to the schedule
  480. huey_results.add_schedule(task1)
  481. huey_results.add_schedule(task2)
  482. huey_results.add_schedule(task3)
  483. # sanity check what should be run
  484. self.assertFalse(huey_results.ready_to_run(task1))
  485. self.assertFalse(huey_results.ready_to_run(task2))
  486. self.assertTrue(huey_results.ready_to_run(task3))
  487. self.assertFalse(huey_results.ready_to_run(task1, curr50))
  488. self.assertFalse(huey_results.ready_to_run(task2, curr50))
  489. self.assertTrue(huey_results.ready_to_run(task3, curr50))
  490. self.assertTrue(huey_results.ready_to_run(task1, curr70))
  491. self.assertFalse(huey_results.ready_to_run(task2, curr70))
  492. self.assertTrue(huey_results.ready_to_run(task3, curr70))
  493. def test_task_decorators(self):
  494. huey = RedisHuey()
  495. def test_fn():
  496. pass
  497. test_fn_task = huey.task()(test_fn)
  498. test_fn_cron = huey.periodic_task(crontab(minute='0'))(test_fn)
  499. self.assertTrue(isinstance(test_fn_task, TaskWrapper))
  500. self.assertTrue(test_fn_task.func is test_fn)
  501. self.assertTrue(isinstance(test_fn_cron, TaskWrapper))
  502. self.assertTrue(test_fn_cron.func is test_fn)
  503. test_cron_task = huey.periodic_task(crontab(minute='0'))(test_fn_task)
  504. self.assertTrue(isinstance(test_cron_task, TaskWrapper))
  505. self.assertTrue(test_cron_task.func is test_fn)
  506. def test_flush_locks(self):
  507. with huey.lock_task('lock1'):
  508. with huey.lock_task('lock2'):
  509. flushed = huey.flush_locks()
  510. self.assertEqual(flushed, set(['lock1', 'lock2']))
  511. self.assertEqual(huey.flush_locks(), set())
  512. eager_huey = RedisHuey(blocking=False, always_eager=True)
  513. @eager_huey.task()
  514. def add(a, b):
  515. return a + b
  516. class TestAlwaysEager(BaseQueueTestCase):
  517. def test_always_eager(self):
  518. self.assertEqual(add(1, 3), 4)
  519. # Test pipelining.
  520. pipe = add.s(1, 2).then(add, 3).then(add, 4).then(add, 5)
  521. result = eager_huey.enqueue(pipe)
  522. self.assertEqual(result, [3, 6, 10, 15])
  523. def test_always_eager_failure(self):
  524. self.assertRaises(TypeError, add, 1, None)
  525. pipe = add.s(1, 2).then(add, None).then(add, 4)
  526. self.assertRaises(TypeError, eager_huey.enqueue, pipe)