test_autoreload.py 9.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324
  1. from __future__ import absolute_import
  2. import errno
  3. import select
  4. import sys
  5. from mock import Mock, patch
  6. from time import time
  7. from celery.worker import autoreload
  8. from celery.worker.autoreload import (
  9. WorkerComponent,
  10. file_hash,
  11. BaseMonitor,
  12. StatMonitor,
  13. KQueueMonitor,
  14. InotifyMonitor,
  15. default_implementation,
  16. Autoreloader,
  17. )
  18. from celery.tests.case import AppCase, Case, mock_open
  19. class test_WorkerComponent(AppCase):
  20. def test_create_threaded(self):
  21. w = Mock()
  22. w.use_eventloop = False
  23. x = WorkerComponent(w)
  24. x.instantiate = Mock()
  25. r = x.create(w)
  26. x.instantiate.assert_called_with(w.autoreloader_cls, w)
  27. self.assertIs(r, w.autoreloader)
  28. @patch('select.kevent', create=True)
  29. @patch('select.kqueue', create=True)
  30. def test_create_ev(self, kqueue, kevent):
  31. w = Mock()
  32. w.use_eventloop = True
  33. x = WorkerComponent(w)
  34. x.instantiate = Mock()
  35. r = x.create(w)
  36. x.instantiate.assert_called_with(w.autoreloader_cls, w)
  37. x.register_with_event_loop(w, w.hub)
  38. self.assertIsNone(r)
  39. w.hub.on_close.add.assert_called_with(
  40. w.autoreloader.on_event_loop_close,
  41. )
  42. class test_file_hash(Case):
  43. def test_hash(self):
  44. with mock_open() as a:
  45. a.write('the quick brown fox\n')
  46. a.seek(0)
  47. A = file_hash('foo')
  48. with mock_open() as b:
  49. b.write('the quick brown bar\n')
  50. b.seek(0)
  51. B = file_hash('bar')
  52. self.assertNotEqual(A, B)
  53. class test_BaseMonitor(Case):
  54. def test_start_stop_on_change(self):
  55. x = BaseMonitor(['a', 'b'])
  56. with self.assertRaises(NotImplementedError):
  57. x.start()
  58. x.stop()
  59. x.on_change([])
  60. x._on_change = Mock()
  61. x.on_change('foo')
  62. x._on_change.assert_called_with('foo')
  63. class test_StatMonitor(Case):
  64. @patch('os.stat')
  65. def test_start(self, stat):
  66. class st(object):
  67. st_mtime = time()
  68. stat.return_value = st()
  69. x = StatMonitor(['a', 'b'])
  70. def on_is_set():
  71. if x.shutdown_event.is_set.call_count > 3:
  72. return True
  73. return False
  74. x.shutdown_event = Mock()
  75. x.shutdown_event.is_set.side_effect = on_is_set
  76. x.start()
  77. x.shutdown_event = Mock()
  78. stat.side_effect = OSError()
  79. x.start()
  80. @patch('os.stat')
  81. def test_mtime_stat_raises(self, stat):
  82. stat.side_effect = ValueError()
  83. x = StatMonitor(['a', 'b'])
  84. x._mtime('a')
  85. class test_KQueueMonitor(Case):
  86. @patch('select.kqueue', create=True)
  87. @patch('os.close')
  88. def test_stop(self, close, kqueue):
  89. x = KQueueMonitor(['a', 'b'])
  90. x.poller = Mock()
  91. x.filemap['a'] = 10
  92. x.stop()
  93. x.poller.close.assert_called_with()
  94. close.assert_called_with(10)
  95. close.side_effect = OSError()
  96. close.side_effect.errno = errno.EBADF
  97. x.stop()
  98. def test_register_with_event_loop(self):
  99. x = KQueueMonitor(['a', 'b'])
  100. hub = Mock()
  101. x.add_events = Mock()
  102. x.register_with_event_loop(hub)
  103. x.add_events.assert_called_with(hub.poller)
  104. self.assertEqual(
  105. hub.poller.on_file_change,
  106. x.handle_event,
  107. )
  108. def test_on_event_loop_close(self):
  109. x = KQueueMonitor(['a', 'b'])
  110. x.close = Mock()
  111. hub = Mock()
  112. x.on_event_loop_close(hub)
  113. x.close.assert_called_with(hub.poller)
  114. def test_handle_event(self):
  115. x = KQueueMonitor(['a', 'b'])
  116. x.on_change = Mock()
  117. eA = Mock()
  118. eA.ident = 'a'
  119. eB = Mock()
  120. eB.ident = 'b'
  121. x.fdmap = {'a': 'A', 'b': 'B'}
  122. x.handle_event([eA, eB])
  123. x.on_change.assert_called_with(['A', 'B'])
  124. @patch('kombu.utils.eventio.kqueue', create=True)
  125. @patch('kombu.utils.eventio.kevent', create=True)
  126. @patch('os.open')
  127. @patch('select.kqueue', create=True)
  128. def test_start(self, _kq, osopen, kevent, kqueue):
  129. from kombu.utils import eventio
  130. prev_poll, eventio.poll = eventio.poll, kqueue
  131. prev = {}
  132. flags = ['KQ_FILTER_VNODE', 'KQ_EV_ADD', 'KQ_EV_ENABLE',
  133. 'KQ_EV_CLEAR', 'KQ_NOTE_WRITE', 'KQ_NOTE_EXTEND']
  134. for i, flag in enumerate(flags):
  135. prev[flag] = getattr(eventio, flag, None)
  136. if not prev[flag]:
  137. setattr(eventio, flag, i)
  138. try:
  139. kq = kqueue.return_value = Mock()
  140. class ev(object):
  141. ident = 10
  142. filter = eventio.KQ_FILTER_VNODE
  143. fflags = eventio.KQ_NOTE_WRITE
  144. kq.control.return_value = [ev()]
  145. x = KQueueMonitor(['a'])
  146. osopen.return_value = 10
  147. calls = [0]
  148. def on_is_set():
  149. calls[0] += 1
  150. if calls[0] > 2:
  151. return True
  152. return False
  153. x.shutdown_event = Mock()
  154. x.shutdown_event.is_set.side_effect = on_is_set
  155. x.start()
  156. finally:
  157. for flag in flags:
  158. if prev[flag]:
  159. setattr(eventio, flag, prev[flag])
  160. else:
  161. delattr(eventio, flag)
  162. eventio.poll = prev_poll
  163. class test_InotifyMonitor(Case):
  164. @patch('celery.worker.autoreload.pyinotify')
  165. def test_start(self, inotify):
  166. x = InotifyMonitor(['a'])
  167. inotify.IN_MODIFY = 1
  168. inotify.IN_ATTRIB = 2
  169. x.start()
  170. inotify.WatchManager.side_effect = ValueError()
  171. with self.assertRaises(ValueError):
  172. x.start()
  173. x.stop()
  174. x._on_change = None
  175. x.process_(Mock())
  176. x._on_change = Mock()
  177. x.process_(Mock())
  178. self.assertTrue(x._on_change.called)
  179. class test_default_implementation(Case):
  180. @patch('select.kqueue', create=True)
  181. def test_kqueue(self, kqueue):
  182. self.assertEqual(default_implementation(), 'kqueue')
  183. @patch('celery.worker.autoreload.pyinotify')
  184. def test_inotify(self, pyinotify):
  185. kq = getattr(select, 'kqueue', None)
  186. try:
  187. delattr(select, 'kqueue')
  188. except AttributeError:
  189. pass
  190. platform, sys.platform = sys.platform, 'linux'
  191. try:
  192. self.assertEqual(default_implementation(), 'inotify')
  193. ino, autoreload.pyinotify = autoreload.pyinotify, None
  194. try:
  195. self.assertEqual(default_implementation(), 'stat')
  196. finally:
  197. autoreload.pyinotify = ino
  198. finally:
  199. if kq:
  200. select.kqueue = kq
  201. sys.platform = platform
  202. class test_Autoreloader(AppCase):
  203. def test_register_with_event_loop(self):
  204. x = Autoreloader(Mock(), modules=[__name__])
  205. hub = Mock()
  206. x._monitor = None
  207. x.on_init = Mock()
  208. def se(*args, **kwargs):
  209. x._monitor = Mock()
  210. x.on_init.side_effect = se
  211. x.register_with_event_loop(hub)
  212. x.on_init.assert_called_with()
  213. x._monitor.register_with_event_loop.assert_called_with(hub)
  214. x._monitor.register_with_event_loop.reset_mock()
  215. x.register_with_event_loop(hub)
  216. x._monitor.register_with_event_loop.assert_called_with(hub)
  217. def test_on_event_loop_close(self):
  218. x = Autoreloader(Mock(), modules=[__name__])
  219. hub = Mock()
  220. x._monitor = Mock()
  221. x.on_event_loop_close(hub)
  222. x._monitor.on_event_loop_close.assert_called_with(hub)
  223. x._monitor = None
  224. x.on_event_loop_close(hub)
  225. @patch('celery.worker.autoreload.file_hash')
  226. def test_start(self, fhash):
  227. x = Autoreloader(Mock(), modules=[__name__])
  228. x.Monitor = Mock()
  229. mon = x.Monitor.return_value = Mock()
  230. mon.start.side_effect = OSError()
  231. mon.start.side_effect.errno = errno.EINTR
  232. x.body()
  233. mon.start.side_effect.errno = errno.ENOENT
  234. with self.assertRaises(OSError):
  235. x.body()
  236. mon.start.side_effect = None
  237. x.body()
  238. @patch('celery.worker.autoreload.file_hash')
  239. @patch('os.path.exists')
  240. def test_maybe_modified(self, exists, fhash):
  241. exists.return_value = True
  242. fhash.return_value = 'abcd'
  243. x = Autoreloader(Mock(), modules=[__name__])
  244. x._hashes = {}
  245. x._hashes[__name__] = 'dcba'
  246. self.assertTrue(x._maybe_modified(__name__))
  247. x._hashes[__name__] = 'abcd'
  248. self.assertFalse(x._maybe_modified(__name__))
  249. exists.return_value = False
  250. self.assertFalse(x._maybe_modified(__name__))
  251. def test_on_change(self):
  252. x = Autoreloader(Mock(), modules=[__name__])
  253. mm = x._maybe_modified = Mock(0)
  254. mm.return_value = True
  255. x._reload = Mock()
  256. x.file_to_module[__name__] = __name__
  257. x.on_change([__name__])
  258. self.assertTrue(x._reload.called)
  259. mm.return_value = False
  260. x.on_change([__name__])
  261. def test_reload(self):
  262. x = Autoreloader(Mock(), modules=[__name__])
  263. x._reload([__name__])
  264. x.controller.reload.assert_called_with([__name__], reload=True)
  265. def test_stop(self):
  266. x = Autoreloader(Mock(), modules=[__name__])
  267. x._monitor = None
  268. x.stop()
  269. x._monitor = Mock()
  270. x.stop()
  271. x._monitor.stop.assert_called_with()