test_snapshot.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. from __future__ import absolute_import
  2. from mock import patch
  3. from celery.events import Events
  4. from celery.events.snapshot import Polaroid, evcam
  5. from celery.tests.case import AppCase, restore_logging
  6. class TRef(object):
  7. active = True
  8. called = False
  9. def __call__(self):
  10. self.called = True
  11. def cancel(self):
  12. self.active = False
  13. class MockTimer(object):
  14. installed = []
  15. def call_repeatedly(self, secs, fun, *args, **kwargs):
  16. self.installed.append(fun)
  17. return TRef()
  18. timer = MockTimer()
  19. class test_Polaroid(AppCase):
  20. def setup(self):
  21. self.state = self.app.events.State()
  22. def test_constructor(self):
  23. x = Polaroid(self.state, app=self.app)
  24. self.assertIs(x.app, self.app)
  25. self.assertIs(x.state, self.state)
  26. self.assertTrue(x.freq)
  27. self.assertTrue(x.cleanup_freq)
  28. self.assertTrue(x.logger)
  29. self.assertFalse(x.maxrate)
  30. def test_install_timers(self):
  31. x = Polaroid(self.state, app=self.app)
  32. x.timer = timer
  33. x.__exit__()
  34. x.__enter__()
  35. self.assertIn(x.capture, MockTimer.installed)
  36. self.assertIn(x.cleanup, MockTimer.installed)
  37. self.assertTrue(x._tref.active)
  38. self.assertTrue(x._ctref.active)
  39. x.__exit__()
  40. self.assertFalse(x._tref.active)
  41. self.assertFalse(x._ctref.active)
  42. self.assertTrue(x._tref.called)
  43. self.assertFalse(x._ctref.called)
  44. def test_cleanup(self):
  45. x = Polaroid(self.state, app=self.app)
  46. cleanup_signal_sent = [False]
  47. def handler(**kwargs):
  48. cleanup_signal_sent[0] = True
  49. x.cleanup_signal.connect(handler)
  50. x.cleanup()
  51. self.assertTrue(cleanup_signal_sent[0])
  52. def test_shutter__capture(self):
  53. x = Polaroid(self.state, app=self.app)
  54. shutter_signal_sent = [False]
  55. def handler(**kwargs):
  56. shutter_signal_sent[0] = True
  57. x.shutter_signal.connect(handler)
  58. x.shutter()
  59. self.assertTrue(shutter_signal_sent[0])
  60. shutter_signal_sent[0] = False
  61. x.capture()
  62. self.assertTrue(shutter_signal_sent[0])
  63. def test_shutter_maxrate(self):
  64. x = Polaroid(self.state, app=self.app, maxrate='1/h')
  65. shutter_signal_sent = [0]
  66. def handler(**kwargs):
  67. shutter_signal_sent[0] += 1
  68. x.shutter_signal.connect(handler)
  69. for i in range(30):
  70. x.shutter()
  71. x.shutter()
  72. x.shutter()
  73. self.assertEqual(shutter_signal_sent[0], 1)
  74. class test_evcam(AppCase):
  75. class MockReceiver(object):
  76. raise_keyboard_interrupt = False
  77. def capture(self, **kwargs):
  78. if self.__class__.raise_keyboard_interrupt:
  79. raise KeyboardInterrupt()
  80. class MockEvents(Events):
  81. def Receiver(self, *args, **kwargs):
  82. return test_evcam.MockReceiver()
  83. def setup(self):
  84. self.app.events = self.MockEvents()
  85. self.app.events.app = self.app
  86. def test_evcam(self):
  87. with restore_logging():
  88. evcam(Polaroid, timer=timer, app=self.app)
  89. evcam(Polaroid, timer=timer, loglevel='CRITICAL', app=self.app)
  90. self.MockReceiver.raise_keyboard_interrupt = True
  91. try:
  92. with self.assertRaises(SystemExit):
  93. evcam(Polaroid, timer=timer, app=self.app)
  94. finally:
  95. self.MockReceiver.raise_keyboard_interrupt = False
  96. @patch('celery.platforms.create_pidlock')
  97. def test_evcam_pidfile(self, create_pidlock):
  98. evcam(Polaroid, timer=timer, pidfile='/var/pid', app=self.app)
  99. create_pidlock.assert_called_with('/var/pid')