test_heartbeat.py 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273
  1. from __future__ import absolute_import
  2. from celery.worker.heartbeat import Heart
  3. from celery.tests.case import AppCase
  4. class MockDispatcher(object):
  5. heart = None
  6. next_iter = 0
  7. def __init__(self):
  8. self.sent = []
  9. self.on_enabled = set()
  10. self.on_disabled = set()
  11. self.enabled = True
  12. def send(self, msg, **_fields):
  13. self.sent.append(msg)
  14. if self.heart:
  15. if self.next_iter > 10:
  16. self.heart._shutdown.set()
  17. self.next_iter += 1
  18. class MockDispatcherRaising(object):
  19. def send(self, msg):
  20. if msg == 'worker-offline':
  21. raise Exception('foo')
  22. class MockTimer(object):
  23. def call_repeatedly(self, secs, fun, args=(), kwargs={}):
  24. class entry(tuple):
  25. cancelled = False
  26. def cancel(self):
  27. self.cancelled = True
  28. return entry((secs, fun, args, kwargs))
  29. def cancel(self, entry):
  30. entry.cancel()
  31. class test_Heart(AppCase):
  32. def test_start_stop(self):
  33. timer = MockTimer()
  34. eventer = MockDispatcher()
  35. h = Heart(timer, eventer, interval=1)
  36. h.start()
  37. self.assertTrue(h.tref)
  38. h.stop()
  39. self.assertIsNone(h.tref)
  40. h.stop()
  41. def test_start_when_disabled(self):
  42. timer = MockTimer()
  43. eventer = MockDispatcher()
  44. eventer.enabled = False
  45. h = Heart(timer, eventer)
  46. h.start()
  47. self.assertFalse(h.tref)
  48. def test_stop_when_disabled(self):
  49. timer = MockTimer()
  50. eventer = MockDispatcher()
  51. eventer.enabled = False
  52. h = Heart(timer, eventer)
  53. h.stop()