test_timer2.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. from __future__ import absolute_import
  2. import sys
  3. import time
  4. import celery.utils.timer2 as timer2
  5. from celery.tests.case import Case, Mock, patch
  6. from kombu.tests.case import redirect_stdouts
  7. class test_Entry(Case):
  8. def test_call(self):
  9. scratch = [None]
  10. def timed(x, y, moo='foo'):
  11. scratch[0] = (x, y, moo)
  12. tref = timer2.Entry(timed, (4, 4), {'moo': 'baz'})
  13. tref()
  14. self.assertTupleEqual(scratch[0], (4, 4, 'baz'))
  15. def test_cancel(self):
  16. tref = timer2.Entry(lambda x: x, (1, ), {})
  17. tref.cancel()
  18. self.assertTrue(tref.cancelled)
  19. def test_repr(self):
  20. tref = timer2.Entry(lambda x: x(1, ), {})
  21. self.assertTrue(repr(tref))
  22. class test_Schedule(Case):
  23. def test_supports_Timer_interface(self):
  24. x = timer2.Schedule()
  25. x.stop()
  26. tref = Mock()
  27. x.cancel(tref)
  28. tref.cancel.assert_called_with()
  29. self.assertIs(x.schedule, x)
  30. def test_handle_error(self):
  31. from datetime import datetime
  32. scratch = [None]
  33. def on_error(exc_info):
  34. scratch[0] = exc_info
  35. s = timer2.Schedule(on_error=on_error)
  36. with patch('kombu.async.timer.to_timestamp') as tot:
  37. tot.side_effect = OverflowError()
  38. s.enter_at(timer2.Entry(lambda: None, (), {}),
  39. eta=datetime.now())
  40. s.enter_at(timer2.Entry(lambda: None, (), {}), eta=None)
  41. s.on_error = None
  42. with self.assertRaises(OverflowError):
  43. s.enter_at(timer2.Entry(lambda: None, (), {}),
  44. eta=datetime.now())
  45. exc = scratch[0]
  46. self.assertIsInstance(exc, OverflowError)
  47. class test_Timer(Case):
  48. def test_enter_after(self):
  49. t = timer2.Timer()
  50. try:
  51. done = [False]
  52. def set_done():
  53. done[0] = True
  54. t.call_after(0.3, set_done)
  55. mss = 0
  56. while not done[0]:
  57. if mss >= 2.0:
  58. raise Exception('test timed out')
  59. time.sleep(0.1)
  60. mss += 0.1
  61. finally:
  62. t.stop()
  63. def test_exit_after(self):
  64. t = timer2.Timer()
  65. t.call_after = Mock()
  66. t.exit_after(0.3, priority=10)
  67. t.call_after.assert_called_with(0.3, sys.exit, 10)
  68. def test_ensure_started_not_started(self):
  69. t = timer2.Timer()
  70. t.running = True
  71. t.start = Mock()
  72. t.ensure_started()
  73. self.assertFalse(t.start.called)
  74. def test_call_repeatedly(self):
  75. t = timer2.Timer()
  76. try:
  77. t.schedule.enter_after = Mock()
  78. myfun = Mock()
  79. myfun.__name__ = 'myfun'
  80. t.call_repeatedly(0.03, myfun)
  81. self.assertEqual(t.schedule.enter_after.call_count, 1)
  82. args1, _ = t.schedule.enter_after.call_args_list[0]
  83. sec1, tref1, _ = args1
  84. self.assertEqual(sec1, 0.03)
  85. tref1()
  86. self.assertEqual(t.schedule.enter_after.call_count, 2)
  87. args2, _ = t.schedule.enter_after.call_args_list[1]
  88. sec2, tref2, _ = args2
  89. self.assertEqual(sec2, 0.03)
  90. tref2.cancelled = True
  91. tref2()
  92. self.assertEqual(t.schedule.enter_after.call_count, 2)
  93. finally:
  94. t.stop()
  95. @patch('kombu.async.timer.logger')
  96. def test_apply_entry_error_handled(self, logger):
  97. t = timer2.Timer()
  98. t.schedule.on_error = None
  99. fun = Mock()
  100. fun.side_effect = ValueError()
  101. t.schedule.apply_entry(fun)
  102. self.assertTrue(logger.error.called)
  103. @redirect_stdouts
  104. def test_apply_entry_error_not_handled(self, stdout, stderr):
  105. t = timer2.Timer()
  106. t.schedule.on_error = Mock()
  107. fun = Mock()
  108. fun.side_effect = ValueError()
  109. t.schedule.apply_entry(fun)
  110. fun.assert_called_with()
  111. self.assertFalse(stderr.getvalue())
  112. @patch('os._exit')
  113. def test_thread_crash(self, _exit):
  114. t = timer2.Timer()
  115. t._next_entry = Mock()
  116. t._next_entry.side_effect = OSError(131)
  117. t.run()
  118. _exit.assert_called_with(1)
  119. def test_gc_race_lost(self):
  120. t = timer2.Timer()
  121. t._is_stopped.set = Mock()
  122. t._is_stopped.set.side_effect = TypeError()
  123. t._is_shutdown.set()
  124. t.run()
  125. t._is_stopped.set.assert_called_with()
  126. def test_to_timestamp(self):
  127. self.assertIs(timer2.to_timestamp(3.13), 3.13)
  128. def test_test_enter(self):
  129. t = timer2.Timer()
  130. t._do_enter = Mock()
  131. e = Mock()
  132. t.enter(e, 13, 0)
  133. t._do_enter.assert_called_with('enter_at', e, 13, priority=0)
  134. def test_test_enter_after(self):
  135. t = timer2.Timer()
  136. t._do_enter = Mock()
  137. t.enter_after()
  138. t._do_enter.assert_called_with('enter_after')
  139. def test_cancel(self):
  140. t = timer2.Timer()
  141. tref = Mock()
  142. t.cancel(tref)
  143. tref.cancel.assert_called_with()