autoreload.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.autoreload
  4. ~~~~~~~~~~~~~~~~~~~~~~~~
  5. This module implements automatic module reloading
  6. """
  7. from __future__ import absolute_import
  8. import hashlib
  9. import os
  10. import select
  11. import sys
  12. import time
  13. from collections import defaultdict
  14. from threading import Event
  15. from kombu.utils import eventio
  16. from kombu.utils.encoding import ensure_bytes
  17. from celery import bootsteps
  18. from celery.five import items
  19. from celery.platforms import ignore_errno
  20. from celery.utils.imports import module_file
  21. from celery.utils.log import get_logger
  22. from celery.utils.threads import bgThread
  23. from .components import Pool
  24. try: # pragma: no cover
  25. import pyinotify
  26. _ProcessEvent = pyinotify.ProcessEvent
  27. except ImportError: # pragma: no cover
  28. pyinotify = None # noqa
  29. _ProcessEvent = object # noqa
  30. __all__ = [
  31. 'WorkerComponent', 'Autoreloader', 'Monitor', 'BaseMonitor',
  32. 'StatMonitor', 'KQueueMonitor', 'InotifyMonitor', 'file_hash',
  33. ]
  34. logger = get_logger(__name__)
  35. class WorkerComponent(bootsteps.StartStopStep):
  36. label = 'Autoreloader'
  37. conditional = True
  38. requires = (Pool, )
  39. def __init__(self, w, autoreload=None, **kwargs):
  40. self.enabled = w.autoreload = autoreload
  41. w.autoreloader = None
  42. def create(self, w):
  43. w.autoreloader = self.instantiate(w.autoreloader_cls, w)
  44. return w.autoreloader if not w.use_eventloop else None
  45. def register_with_event_loop(self, w, hub):
  46. if hasattr(select, 'kqueue'):
  47. w.autoreloader.register_with_event_loop(hub)
  48. hub.on_close.add(w.autoreloader.on_event_loop_close)
  49. def file_hash(filename, algorithm='md5'):
  50. hobj = hashlib.new(algorithm)
  51. with open(filename, 'rb') as f:
  52. for chunk in iter(lambda: f.read(2 ** 20), ''):
  53. hobj.update(ensure_bytes(chunk))
  54. return hobj.digest()
  55. class BaseMonitor(object):
  56. def __init__(self, files,
  57. on_change=None, shutdown_event=None, interval=0.5):
  58. self.files = files
  59. self.interval = interval
  60. self._on_change = on_change
  61. self.modify_times = defaultdict(int)
  62. self.shutdown_event = shutdown_event or Event()
  63. def start(self):
  64. raise NotImplementedError('Subclass responsibility')
  65. def stop(self):
  66. pass
  67. def on_change(self, modified):
  68. if self._on_change:
  69. return self._on_change(modified)
  70. class StatMonitor(BaseMonitor):
  71. """File change monitor based on the ``stat`` system call."""
  72. def _mtimes(self):
  73. return ((f, self._mtime(f)) for f in self.files)
  74. def _maybe_modified(self, f, mt):
  75. return mt is not None and self.modify_times[f] != mt
  76. def start(self):
  77. while not self.shutdown_event.is_set():
  78. modified = dict((f, mt) for f, mt in self._mtimes()
  79. if self._maybe_modified(f, mt))
  80. if modified:
  81. self.on_change(modified)
  82. self.modify_times.update(modified)
  83. time.sleep(self.interval)
  84. @staticmethod
  85. def _mtime(path):
  86. try:
  87. return os.stat(path).st_mtime
  88. except Exception:
  89. pass
  90. class KQueueMonitor(BaseMonitor):
  91. """File change monitor based on BSD kernel event notifications"""
  92. def __init__(self, *args, **kwargs):
  93. super(KQueueMonitor, self).__init__(*args, **kwargs)
  94. self.filemap = dict((f, None) for f in self.files)
  95. self.fdmap = {}
  96. def register_with_event_loop(self, hub):
  97. self.add_events(hub.poller)
  98. hub.poller.on_file_change = self.handle_event
  99. def on_event_loop_close(self, hub):
  100. self.close(hub.poller)
  101. def add_events(self, poller):
  102. for f in self.filemap:
  103. self.filemap[f] = fd = os.open(f, os.O_RDONLY)
  104. self.fdmap[fd] = f
  105. poller.watch_file(fd)
  106. def handle_event(self, events):
  107. self.on_change([self.fdmap[e.ident] for e in events])
  108. def start(self):
  109. self.poller = eventio.poll()
  110. self.add_events(self.poller)
  111. self.poller.on_file_change = self.handle_event
  112. while not self.shutdown_event.is_set():
  113. self.poller.poll(1)
  114. def close(self, poller):
  115. for f, fd in items(self.filemap):
  116. if fd is not None:
  117. poller.unregister(fd)
  118. with ignore_errno('EBADF'): # pragma: no cover
  119. os.close(fd)
  120. self.filemap.clear()
  121. self.fdmap.clear()
  122. def stop(self):
  123. self.close(self.poller)
  124. self.poller.close()
  125. class InotifyMonitor(_ProcessEvent):
  126. """File change monitor based on Linux kernel `inotify` subsystem"""
  127. def __init__(self, modules, on_change=None, **kwargs):
  128. assert pyinotify
  129. self._modules = modules
  130. self._on_change = on_change
  131. self._wm = None
  132. self._notifier = None
  133. def start(self):
  134. try:
  135. self._wm = pyinotify.WatchManager()
  136. self._notifier = pyinotify.Notifier(self._wm, self)
  137. add_watch = self._wm.add_watch
  138. flags = pyinotify.IN_MODIFY | pyinotify.IN_ATTRIB
  139. for m in self._modules:
  140. add_watch(m, flags)
  141. self._notifier.loop()
  142. finally:
  143. if self._wm:
  144. self._wm.close()
  145. # Notifier.close is called at the end of Notifier.loop
  146. self._wm = self._notifier = None
  147. def stop(self):
  148. pass
  149. def process_(self, event):
  150. self.on_change([event.path])
  151. process_IN_ATTRIB = process_IN_MODIFY = process_
  152. def on_change(self, modified):
  153. if self._on_change:
  154. return self._on_change(modified)
  155. def default_implementation():
  156. if hasattr(select, 'kqueue'):
  157. return 'kqueue'
  158. if sys.platform.startswith('linux') and pyinotify:
  159. return 'inotify'
  160. else:
  161. return 'stat'
  162. implementations = {'kqueue': KQueueMonitor,
  163. 'inotify': InotifyMonitor,
  164. 'stat': StatMonitor}
  165. Monitor = implementations[
  166. os.environ.get('CELERYD_FSNOTIFY') or default_implementation()]
  167. class Autoreloader(bgThread):
  168. """Tracks changes in modules and fires reload commands"""
  169. Monitor = Monitor
  170. def __init__(self, controller, modules=None, monitor_cls=None, **options):
  171. super(Autoreloader, self).__init__()
  172. self.controller = controller
  173. app = self.controller.app
  174. self.modules = app.loader.task_modules if modules is None else modules
  175. self.options = options
  176. self._monitor = None
  177. self._hashes = None
  178. self.file_to_module = {}
  179. def on_init(self):
  180. files = self.file_to_module
  181. files.update(dict(
  182. (module_file(sys.modules[m]), m) for m in self.modules))
  183. self._monitor = self.Monitor(
  184. files, self.on_change,
  185. shutdown_event=self._is_shutdown, **self.options)
  186. self._hashes = dict([(f, file_hash(f)) for f in files])
  187. def register_with_event_loop(self, hub):
  188. if self._monitor is None:
  189. self.on_init()
  190. self._monitor.register_with_event_loop(hub)
  191. def on_event_loop_close(self, hub):
  192. if self._monitor is not None:
  193. self._monitor.on_event_loop_close(hub)
  194. def body(self):
  195. self.on_init()
  196. with ignore_errno('EINTR', 'EAGAIN'):
  197. self._monitor.start()
  198. def _maybe_modified(self, f):
  199. if os.path.exists(f):
  200. digest = file_hash(f)
  201. if digest != self._hashes[f]:
  202. self._hashes[f] = digest
  203. return True
  204. return False
  205. def on_change(self, files):
  206. modified = [f for f in files if self._maybe_modified(f)]
  207. if modified:
  208. names = [self.file_to_module[module] for module in modified]
  209. logger.info('Detected modified modules: %r', names)
  210. self._reload(names)
  211. def _reload(self, modules):
  212. self.controller.reload(modules, reload=True)
  213. def stop(self):
  214. if self._monitor:
  215. self._monitor.stop()