test_consumer.py 20 KB


  1. import contextlib
  2. import datetime
  3. import threading
  4. import time
  5. from functools import wraps
  6. from huey import crontab
  7. from huey.consumer import Consumer
  8. from huey.consumer import Scheduler
  9. from huey.consumer import Worker
  10. from huey.exceptions import DataStoreTimeout
  11. from huey.exceptions import RetryTask
  12. from huey.exceptions import TaskException
  13. from huey.tests.base import b
  14. from huey.tests.base import BrokenHuey
  15. from huey.tests.base import CaptureLogs
  16. from huey.tests.base import HueyTestCase
  17. from huey.tests.base import test_huey
  18. # Store some global state.
  19. state = {}
  20. lock = threading.Lock()
  21. # Create some test tasks.
  22. @test_huey.task()
  23. def modify_state(k, v):
  24. with lock:
  25. state[k] = v
  26. return v
  27. @test_huey.task()
  28. def blow_up():
  29. raise Exception('blowed up')
  30. @test_huey.task(retries=3)
  31. def retry_task(k, always_fail=True):
  32. if k not in state:
  33. if not always_fail:
  34. state[k] = 'fixed'
  35. raise Exception('fappsk')
  36. return state[k]
  37. @test_huey.task(retries=3, retry_delay=10)
  38. def retry_task_delay(k, always_fail=True):
  39. if k not in state:
  40. if not always_fail:
  41. state[k] = 'fixed'
  42. raise Exception('fappsk')
  43. return state[k]
  44. @test_huey.task(retries=2)
  45. def explicit_retry(k):
  46. if k not in state:
  47. state[k] = 'fixed'
  48. raise RetryTask()
  49. return state[k]
  50. @test_huey.task(retries=1, include_task=True)
  51. def retry_with_task(a, b, task=None):
  52. assert task is not None
  53. if a + b < 0:
  54. raise RetryTask()
  55. return a + b
  56. @test_huey.periodic_task(crontab(minute='2'))
  57. def hourly_task():
  58. state['p'] = 'y'
  59. @test_huey.periodic_task(crontab(minute='3'), retries=3)
  60. def hourly_task2():
  61. try:
  62. state['p2'] += 1
  63. except KeyError:
  64. state['p2'] = 1
  65. raise
  66. @test_huey.task(retries=2)
  67. @test_huey.lock_task('test-lock')
  68. def locked_task(a, b):
  69. return a + b
  70. class CrashableWorker(Worker):
  71. def __init__(self, *args, **kwargs):
  72. super(CrashableWorker, self).__init__(*args, **kwargs)
  73. self._crash = threading.Event()
  74. self._crashed = threading.Event()
  75. def crash(self):
  76. self._crash.set()
  77. def crashed(self, blocking=True):
  78. if blocking:
  79. self._crashed.wait()
  80. return True
  81. else:
  82. return self._crashed.is_set()
  83. def loop(self, now=None):
  84. if self._crash.is_set() and not self._crashed.is_set():
  85. self._crashed.set()
  86. raise KeyboardInterrupt
  87. elif self._crashed.is_set():
  88. return
  89. super(CrashableWorker, self).loop(now=now)
  90. class CrashableConsumer(Consumer):
  91. def _create_worker(self):
  92. return CrashableWorker(
  93. huey=self.huey,
  94. default_delay=self.default_delay,
  95. max_delay=self.max_delay,
  96. backoff=self.backoff,
  97. utc=self.utc)
  98. def is_crashed(self, worker=1, blocking=True):
  99. worker, _ = self.worker_threads[worker - 1]
  100. return worker.crashed(blocking=blocking)
  101. def crash(self, worker=1):
  102. worker, process = self.worker_threads[worker - 1]
  103. worker.crash()
  104. class ConsumerTestCase(HueyTestCase):
  105. def setUp(self):
  106. super(ConsumerTestCase, self).setUp()
  107. global state
  108. state = {}
  109. def consumer_test(method):
  110. @wraps(method)
  111. def inner(self):
  112. consumer = self.create_consumer()
  113. with CaptureLogs() as capture:
  114. consumer.start()
  115. try:
  116. return method(self, consumer, capture)
  117. finally:
  118. consumer.stop()
  119. for _, worker in consumer.worker_threads:
  120. worker.join()
  121. return inner
  122. class TestExecution(ConsumerTestCase):
  123. def create_consumer(self, worker_type='thread'):
  124. consumer = CrashableConsumer(
  125. self.huey,
  126. max_delay=0.1,
  127. workers=2,
  128. worker_type=worker_type,
  129. health_check_interval=0.01)
  130. consumer._stop_flag_timeout = 0.01
  131. return consumer
  132. @consumer_test
  133. def test_health_check(self, consumer, capture):
  134. modify_state('ka', 'va').get(blocking=True)
  135. self.assertEqual(state, {'ka': 'va'})
  136. consumer.crash(1)
  137. self.assertTrue(consumer.is_crashed(1))
  138. # One worker still alive.
  139. modify_state('ka', 'vx').get(blocking=True)
  140. self.assertEqual(state, {'ka': 'vx'})
  141. consumer.crash(2)
  142. self.assertTrue(consumer.is_crashed(2))
  143. self.assertEqual(self.huey.pending_count(), 0)
  144. result = modify_state('ka', 'vz')
  145. wt1, wt2 = consumer.worker_threads
  146. w1, w2 = wt1[0], wt2[0]
  147. w1.loop()
  148. w2.loop()
  149. self.assertEqual(self.huey.pending_count(), 1)
  150. consumer.check_worker_health()
  151. result.get(blocking=True)
  152. self.assertEqual(state, {'ka': 'vz'})
  153. @consumer_test
  154. def test_threaded_execution(self, consumer, capture):
  155. r1 = modify_state('k1', 'v1')
  156. r2 = modify_state('k2', 'v2')
  157. r3 = modify_state('k3', 'v3')
  158. try:
  159. r2.get(blocking=True, timeout=5)
  160. r3.get(blocking=True, timeout=5)
  161. r1.get(blocking=True, timeout=5)
  162. except DataStoreTimeout:
  163. assert False, 'Timeout. Consumer/workers running correctly?'
  164. self.assertEqual(state, {'k1': 'v1', 'k2': 'v2', 'k3': 'v3'})
  165. class TestConsumerAPIs(ConsumerTestCase):
  166. def get_periodic_tasks(self):
  167. return [hourly_task.task_class, hourly_task2.task_class]
  168. def test_dequeue_errors(self):
  169. huey = BrokenHuey()
  170. consumer = Consumer(huey, max_delay=0.1, workers=2,
  171. worker_type='thread')
  172. worker = consumer._create_worker()
  173. state = {}
  174. @huey.task()
  175. def modify_broken(k, v):
  176. state[k] = v
  177. with CaptureLogs() as capture:
  178. res = modify_broken('k', 'v')
  179. worker.loop()
  180. self.assertEqual(capture.messages, ['Error reading from queue'])
  181. self.assertEqual(state, {})
  182. def test_scheduler_interval(self):
  183. consumer = self.get_consumer(scheduler_interval=0.1)
  184. self.assertEqual(consumer.scheduler_interval, 1)
  185. consumer = self.get_consumer(scheduler_interval=120)
  186. self.assertEqual(consumer.scheduler_interval, 60)
  187. consumer = self.get_consumer(scheduler_interval=10)
  188. self.assertEqual(consumer.scheduler_interval, 10)
  189. def test_message_processing(self):
  190. worker = self.consumer._create_worker()
  191. self.assertEqual(state, {})
  192. with CaptureLogs() as capture:
  193. res = modify_state('k', 'v')
  194. worker.loop()
  195. self.assertLogs(capture, ['Executing %s' % res.task,
  196. 'Executed %s in ' % res.task])
  197. self.assertEqual(state, {'k': 'v'})
  198. self.assertEqual(res.get(), 'v')
  199. self.assertTaskEvents(
  200. ('started', res.task),
  201. ('finished', res.task))
  202. def test_worker(self):
  203. modify_state('k', 'w')
  204. task = test_huey.dequeue()
  205. self.worker(task)
  206. self.assertEqual(state, {'k': 'w'})
  207. def test_worker_exception(self):
  208. with CaptureLogs() as capture:
  209. blow_up()
  210. task = test_huey.dequeue()
  211. # Nothing happens because the task is not executed.
  212. self.assertLogs(capture, [])
  213. with CaptureLogs() as capture:
  214. self.worker(task)
  215. self.assertLogs(capture, [
  216. 'Executing',
  217. 'Unhandled exception in worker'])
  218. self.assertTaskEvents(
  219. ('started', task),
  220. ('error-task', task))
  221. def test_task_exception(self):
  222. ret = blow_up()
  223. task = test_huey.dequeue()
  224. self.worker(task)
  225. # Calling ".get()" on a task result will raise an exception if the
  226. # task failed.
  227. self.assertRaises(TaskException, ret.get)
  228. try:
  229. ret.get()
  230. except Exception as exc:
  231. self.assertTrue('blowed up' in exc.metadata['error'])
  232. else:
  233. assert False, 'Should not reach this point.'
  234. def test_task_locking(self):
  235. ret = locked_task(1, 2)
  236. task = test_huey.dequeue()
  237. self.worker(task)
  238. self.assertEqual(ret.get(), 3)
  239. ret = locked_task(2, 3)
  240. task = test_huey.dequeue()
  241. with test_huey.lock_task('test-lock'):
  242. self.worker(task)
  243. self.assertRaises(TaskException, ret.get)
  244. def test_retries_and_logging(self):
  245. # This will continually fail.
  246. retry_task('blampf')
  247. for i in reversed(range(4)):
  248. task = test_huey.dequeue()
  249. self.assertEqual(task.retries, i)
  250. with CaptureLogs() as capture:
  251. self.worker(task)
  252. if i > 0:
  253. self.assertLogs(capture, [
  254. 'Executing',
  255. 'Unhandled',
  256. 'Re-enqueueing'])
  257. self.assertTaskEvents(
  258. ('started', task),
  259. ('error-task', task),
  260. ('retrying', task))
  261. else:
  262. self.assertLogs(capture, [
  263. 'Executing',
  264. 'Unhandled'])
  265. self.assertTaskEvents(
  266. ('started', task),
  267. ('error-task', task))
  268. self.assertEqual(len(test_huey), 0)
  269. def test_retries_with_success(self):
  270. # this will fail once, then succeed
  271. retry_task('blampf', False)
  272. self.assertFalse('blampf' in state)
  273. task = test_huey.dequeue()
  274. with CaptureLogs() as capture:
  275. self.worker(task)
  276. self.assertLogs(capture, [
  277. 'Executing',
  278. 'Unhandled',
  279. 'Re-enqueueing'])
  280. task = test_huey.dequeue()
  281. self.assertEqual(task.retries, 2)
  282. self.worker(task)
  283. self.assertEqual(state['blampf'], 'fixed')
  284. self.assertEqual(len(test_huey), 0)
  285. self.assertTaskEvents(
  286. ('started', task),
  287. ('error-task', task),
  288. ('retrying', task),
  289. ('started', task),
  290. ('finished', task))
  291. def test_explicit_retry(self):
  292. explicit_retry('foo')
  293. self.assertFalse('foo' in state)
  294. task = test_huey.dequeue()
  295. with CaptureLogs() as capture:
  296. self.worker(task)
  297. self.assertLogs(capture, ['Executing', 'Re-enqueueing'])
  298. task = test_huey.dequeue()
  299. self.assertEqual(task.retries, 1)
  300. self.worker(task)
  301. self.assertEqual(state['foo'], 'fixed')
  302. self.assertEqual(len(test_huey), 0)
  303. self.assertTaskEvents(
  304. ('started', task),
  305. ('retrying', task),
  306. ('started', task),
  307. ('finished', task))
  308. explicit_retry('bar')
  309. task = test_huey.dequeue()
  310. self.worker(task)
  311. del state['bar']
  312. task = test_huey.dequeue()
  313. self.worker(task)
  314. del state['bar']
  315. task = test_huey.dequeue()
  316. with CaptureLogs() as capture:
  317. self.worker(task)
  318. self.assertLogs(capture, ['Executing', 'Cannot retry task'])
  319. self.assertEqual(len(test_huey), 0)
  320. def test_retry_with_task(self):
  321. retry_with_task(1, -2)
  322. task = test_huey.dequeue()
  323. with CaptureLogs() as capture:
  324. self.worker(task)
  325. task = test_huey.dequeue()
  326. self.worker(task)
  327. self.assertEqual(len(test_huey), 0)
  328. ret = retry_with_task(1, 1)
  329. self.worker(test_huey.dequeue())
  330. self.assertEqual(ret.get(), 2)
  331. self.assertEqual(len(test_huey), 0)
  332. def test_scheduling(self):
  333. dt = datetime.datetime(2011, 1, 1, 0, 1)
  334. dt2 = datetime.datetime(2037, 1, 1, 0, 1)
  335. ad1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
  336. ad2 = modify_state.schedule(args=('k2', 'v2'), eta=dt2, convert_utc=False)
  337. # Dequeue the past-timestamped task and run it.
  338. worker = self.consumer._create_worker()
  339. worker.loop()
  340. self.assertTrue('k' in state)
  341. # Dequeue the future-timestamped task.
  342. worker.loop()
  343. # Verify the task got stored in the schedule instead of executing.
  344. self.assertFalse('k2' in state)
  345. self.assertTaskEvents(
  346. ('started', ad1.task),
  347. ('finished', ad1.task),
  348. ('scheduled', ad2.task))
  349. # run through an iteration of the scheduler
  350. self.scheduler(dt)
  351. # our command was not enqueued and no events were emitted.
  352. self.assertEqual(len(self.huey), 0)
  353. # run through an iteration of the scheduler
  354. self.scheduler(dt2)
  355. # our command was enqueued
  356. self.assertEqual(len(self.huey), 1)
  357. def test_retry_scheduling(self):
  358. # this will continually fail
  359. retry_task_delay('blampf')
  360. cur_time = datetime.datetime.utcnow()
  361. task = self.huey.dequeue()
  362. with CaptureLogs() as capture:
  363. self.worker(task, cur_time)
  364. self.assertLogs(capture, [
  365. 'Executing',
  366. 'Unhandled exception',
  367. 'Re-enqueueing task',
  368. 'Adding'])
  369. in_8 = cur_time + datetime.timedelta(seconds=8)
  370. tasks_from_sched = self.huey.read_schedule(in_8)
  371. self.assertEqual(tasks_from_sched, [])
  372. in_11 = cur_time + datetime.timedelta(seconds=11)
  373. tasks_from_sched = self.huey.read_schedule(in_11)
  374. self.assertEqual(tasks_from_sched, [task])
  375. task = tasks_from_sched[0]
  376. self.assertEqual(task.retries, 2)
  377. exec_time = task.execute_time
  378. self.assertEqual((exec_time - cur_time).seconds, 10)
  379. self.assertTaskEvents(
  380. ('started', task),
  381. ('error-task', task),
  382. ('retrying', task),
  383. ('scheduled', task))
  384. def test_revoking_normal(self):
  385. # enqueue 2 normal commands
  386. r1 = modify_state('k', 'v')
  387. r2 = modify_state('k2', 'v2')
  388. # revoke the first *before it has been checked*
  389. r1.revoke()
  390. self.assertTrue(test_huey.is_revoked(r1.task))
  391. self.assertFalse(test_huey.is_revoked(r2.task))
  392. # dequeue a *single* message (r1)
  393. task = test_huey.dequeue()
  394. self.worker(task)
  395. self.assertTaskEvents(('revoked', r1.task))
  396. # no changes and the task was not added to the schedule
  397. self.assertFalse('k' in state)
  398. # dequeue a *single* message
  399. task = test_huey.dequeue()
  400. self.worker(task)
  401. self.assertTrue('k2' in state)
  402. def test_revoking_schedule(self):
  403. global state
  404. dt = datetime.datetime(2011, 1, 1)
  405. dt2 = datetime.datetime(2037, 1, 1)
  406. r1 = modify_state.schedule(args=('k', 'v'), eta=dt, convert_utc=False)
  407. r2 = modify_state.schedule(args=('k2', 'v2'), eta=dt, convert_utc=False)
  408. r3 = modify_state.schedule(args=('k3', 'v3'), eta=dt2, convert_utc=False)
  409. r4 = modify_state.schedule(args=('k4', 'v4'), eta=dt2, convert_utc=False)
  410. # revoke r1 and r3
  411. r1.revoke()
  412. r3.revoke()
  413. self.assertTrue(test_huey.is_revoked(r1.task))
  414. self.assertFalse(test_huey.is_revoked(r2.task))
  415. self.assertTrue(test_huey.is_revoked(r3.task))
  416. self.assertFalse(test_huey.is_revoked(r4.task))
  417. expected = [
  418. #state, schedule
  419. ({}, 0),
  420. ({'k2': 'v2'}, 0),
  421. ({'k2': 'v2'}, 1),
  422. ({'k2': 'v2'}, 2),
  423. ]
  424. for i in range(4):
  425. curr_state, curr_sched = expected[i]
  426. # dequeue a *single* message
  427. task = test_huey.dequeue()
  428. self.worker(task)
  429. self.assertEqual(state, curr_state)
  430. self.assertEqual(test_huey.scheduled_count(), curr_sched)
  431. # lets pretend its 2037
  432. future = dt2 + datetime.timedelta(seconds=1)
  433. self.scheduler(future)
  434. self.assertEqual(test_huey.scheduled_count(), 0)
  435. # There are two tasks in the queue now (r3 and r4) -- process both.
  436. for i in range(2):
  437. task = test_huey.dequeue()
  438. self.worker(task, future)
  439. self.assertEqual(state, {'k2': 'v2', 'k4': 'v4'})
  440. def test_periodic_scheduler(self):
  441. dt = datetime.datetime(2011, 1, 3, 3, 7)
  442. sched = self.scheduler(dt, False)
  443. self.assertEqual(sched._counter, 1)
  444. self.assertEqual(sched._q, 6)
  445. self.assertEqual(len(self.huey), 0)
  446. dt = datetime.datetime(2011, 1, 1, 0, 2)
  447. sched = self.scheduler(dt, True)
  448. self.assertEqual(sched._counter, 1)
  449. self.assertEqual(sched._q, 6)
  450. self.assertEqual(state, {})
  451. for i in range(len(self.huey)):
  452. task = test_huey.dequeue()
  453. self.worker(task, dt)
  454. self.assertEqual(state, {'p': 'y'})
  455. def test_periodic_with_retry(self):
  456. dt = datetime.datetime(2011, 1, 1, 0, 3)
  457. sched = self.scheduler(dt, True)
  458. self.assertEqual(sched._counter, 1)
  459. self.assertEqual(sched._q, 6)
  460. self.assertEqual(state, {})
  461. self.assertEqual(len(self.huey), 1)
  462. task = test_huey.dequeue()
  463. self.assertEqual(task.retries, 3)
  464. self.worker(task, dt)
  465. # Exception occurred, so now we retry.
  466. self.assertEqual(len(self.huey), 1)
  467. task = test_huey.dequeue()
  468. self.assertEqual(task.retries, 2)
  469. self.worker(task, dt)
  470. self.assertEqual(state, {'p2': 2})
  471. def test_revoking_periodic(self):
  472. global state
  473. def loop_periodic(ts):
  474. self.scheduler(ts, True)
  475. for i in range(len(self.huey)):
  476. task = test_huey.dequeue()
  477. self.worker(task, ts)
  478. dt = datetime.datetime(2011, 1, 1, 0, 2)
  479. # revoke the command once
  480. hourly_task.revoke(revoke_once=True)
  481. self.assertTrue(hourly_task.is_revoked())
  482. # it will be skipped the first go-round
  483. loop_periodic(dt)
  484. # it has not been run
  485. self.assertEqual(state, {})
  486. # the next go-round it will be enqueued
  487. loop_periodic(dt)
  488. # our command was run
  489. self.assertEqual(state, {'p': 'y'})
  490. # reset state
  491. state = {}
  492. # revoke the command
  493. hourly_task.revoke()
  494. self.assertTrue(hourly_task.is_revoked())
  495. # it will no longer be enqueued
  496. loop_periodic(dt)
  497. loop_periodic(dt)
  498. self.assertEqual(state, {})
  499. # restore
  500. hourly_task.restore()
  501. self.assertFalse(hourly_task.is_revoked())
  502. # it will now be enqueued
  503. loop_periodic(dt)
  504. self.assertEqual(state, {'p': 'y'})
  505. # reset
  506. state = {}
  507. # revoke for an hour
  508. td = datetime.timedelta(seconds=3600)
  509. hourly_task.revoke(revoke_until=dt + td)
  510. loop_periodic(dt)
  511. self.assertEqual(state, {})
  512. self.assertEqual(test_huey.result_count(), 1)
  513. # after an hour it is back
  514. loop_periodic(dt + td)
  515. self.assertEqual(state, {'p': 'y'})
  516. # our data store should reflect the delay
  517. self.assertEqual(test_huey.result_count(), 0)
  518. def test_odd_scheduler_interval(self):
  519. self.consumer.stop()
  520. self.consumer = self.get_consumer(scheduler_interval=13)
  521. curr_time = datetime.datetime(2015, 12, 30, 21, 1, 7)
  522. scheduler = self.scheduler(curr_time)
  523. self.assertEqual(scheduler._counter, 1)
  524. self.assertEqual(scheduler._q, 4)
  525. scheduler.loop(curr_time.replace(second=20))
  526. self.assertEqual(scheduler._counter, 2)
  527. self.assertEqual(scheduler._q, 4)
  528. self.assertEqual(len(self.huey), 0)
  529. scheduler.loop(curr_time.replace(second=33))
  530. self.assertEqual(scheduler._counter, 3)
  531. self.assertEqual(scheduler._q, 4)
  532. self.assertEqual(len(self.huey), 0)
  533. scheduler.loop(curr_time.replace(second=46))
  534. self.assertEqual(scheduler._counter, 4)
  535. self.assertEqual(scheduler._q, 4)
  536. self.assertEqual(scheduler._r, 8)
  537. self.assertEqual(len(self.huey), 0)
  538. seconds = (59 + scheduler._r) % 60
  539. scheduler.loop(curr_time.replace(minute=2, second=seconds))
  540. self.assertEqual(scheduler._counter, 0)
  541. self.assertEqual(scheduler._q, 4)
  542. self.assertEqual(len(self.huey), 1)