test_beat.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198
  1. from __future__ import absolute_import
  2. import logging
  3. import sys
  4. from collections import defaultdict
  5. from mock import patch
  6. from celery import beat
  7. from celery import platforms
  8. from celery.bin import beat as beat_bin
  9. from celery.apps import beat as beatapp
  10. from celery.tests.case import AppCase, Mock, restore_logging
  11. from kombu.tests.case import redirect_stdouts
  12. class MockedShelveModule(object):
  13. shelves = defaultdict(lambda: {})
  14. def open(self, filename, *args, **kwargs):
  15. return self.shelves[filename]
  16. mocked_shelve = MockedShelveModule()
  17. class MockService(beat.Service):
  18. started = False
  19. in_sync = False
  20. persistence = mocked_shelve
  21. def start(self):
  22. self.__class__.started = True
  23. def sync(self):
  24. self.__class__.in_sync = True
  25. class MockBeat(beatapp.Beat):
  26. running = False
  27. def run(self):
  28. MockBeat.running = True
  29. class MockBeat2(beatapp.Beat):
  30. Service = MockService
  31. def install_sync_handler(self, b):
  32. pass
  33. class MockBeat3(beatapp.Beat):
  34. Service = MockService
  35. def install_sync_handler(self, b):
  36. raise TypeError('xxx')
  37. class test_Beat(AppCase):
  38. def test_loglevel_string(self):
  39. b = beatapp.Beat(app=self.app, loglevel='DEBUG',
  40. redirect_stdouts=False)
  41. self.assertEqual(b.loglevel, logging.DEBUG)
  42. b2 = beatapp.Beat(app=self.app, loglevel=logging.DEBUG,
  43. redirect_stdouts=False)
  44. self.assertEqual(b2.loglevel, logging.DEBUG)
  45. def test_colorize(self):
  46. self.app.log.setup = Mock()
  47. b = beatapp.Beat(app=self.app, no_color=True,
  48. redirect_stdouts=False)
  49. b.setup_logging()
  50. self.assertTrue(self.app.log.setup.called)
  51. self.assertEqual(self.app.log.setup.call_args[1]['colorize'], False)
  52. def test_init_loader(self):
  53. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  54. b.init_loader()
  55. def test_process_title(self):
  56. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  57. b.set_process_title()
  58. def test_run(self):
  59. b = MockBeat2(app=self.app, redirect_stdouts=False)
  60. MockService.started = False
  61. b.run()
  62. self.assertTrue(MockService.started)
  63. def psig(self, fun, *args, **kwargs):
  64. handlers = {}
  65. class Signals(platforms.Signals):
  66. def __setitem__(self, sig, handler):
  67. handlers[sig] = handler
  68. p, platforms.signals = platforms.signals, Signals()
  69. try:
  70. fun(*args, **kwargs)
  71. return handlers
  72. finally:
  73. platforms.signals = p
  74. def test_install_sync_handler(self):
  75. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  76. clock = MockService(app=self.app)
  77. MockService.in_sync = False
  78. handlers = self.psig(b.install_sync_handler, clock)
  79. with self.assertRaises(SystemExit):
  80. handlers['SIGINT']('SIGINT', object())
  81. self.assertTrue(MockService.in_sync)
  82. MockService.in_sync = False
  83. def test_setup_logging(self):
  84. with restore_logging():
  85. try:
  86. # py3k
  87. delattr(sys.stdout, 'logger')
  88. except AttributeError:
  89. pass
  90. b = beatapp.Beat(app=self.app, redirect_stdouts=False)
  91. b.redirect_stdouts = False
  92. b.app.log.already_setup = False
  93. b.setup_logging()
  94. with self.assertRaises(AttributeError):
  95. sys.stdout.logger
  96. @redirect_stdouts
  97. @patch('celery.apps.beat.logger')
  98. def test_logs_errors(self, logger, stdout, stderr):
  99. with restore_logging():
  100. b = MockBeat3(
  101. app=self.app, redirect_stdouts=False, socket_timeout=None,
  102. )
  103. b.start_scheduler()
  104. self.assertTrue(logger.critical.called)
  105. @redirect_stdouts
  106. @patch('celery.platforms.create_pidlock')
  107. def test_use_pidfile(self, create_pidlock, stdout, stderr):
  108. b = MockBeat2(app=self.app, pidfile='pidfilelockfilepid',
  109. socket_timeout=None, redirect_stdouts=False)
  110. b.start_scheduler()
  111. self.assertTrue(create_pidlock.called)
  112. class MockDaemonContext(object):
  113. opened = False
  114. closed = False
  115. def __init__(self, *args, **kwargs):
  116. pass
  117. def open(self):
  118. self.__class__.opened = True
  119. return self
  120. __enter__ = open
  121. def close(self, *args):
  122. self.__class__.closed = True
  123. __exit__ = close
  124. class test_div(AppCase):
  125. def setup(self):
  126. self.prev, beatapp.Beat = beatapp.Beat, MockBeat
  127. self.ctx, beat_bin.detached = (
  128. beat_bin.detached, MockDaemonContext,
  129. )
  130. def teardown(self):
  131. beatapp.Beat = self.prev
  132. def test_main(self):
  133. sys.argv = [sys.argv[0], '-s', 'foo']
  134. try:
  135. beat_bin.main(app=self.app)
  136. self.assertTrue(MockBeat.running)
  137. finally:
  138. MockBeat.running = False
  139. def test_detach(self):
  140. cmd = beat_bin.beat()
  141. cmd.app = self.app
  142. cmd.run(detach=True)
  143. self.assertTrue(MockDaemonContext.opened)
  144. self.assertTrue(MockDaemonContext.closed)
  145. def test_parse_options(self):
  146. cmd = beat_bin.beat()
  147. cmd.app = self.app
  148. options, args = cmd.parse_options('celery beat', ['-s', 'foo'])
  149. self.assertEqual(options.schedule, 'foo')