events.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. from __future__ import absolute_import
  2. from __future__ import with_statement
  3. import time
  4. import shelve
  5. import logging
  6. import threading
  7. from functools import partial
  8. import celery
  9. from tornado.ioloop import PeriodicCallback
  10. from tornado.ioloop import IOLoop
  11. from celery.events import EventReceiver
  12. from celery.events.state import State
  13. from . import api
  14. from .settings import CELERY_EVENTS_ENABLE_INTERVAL
  15. logger = logging.getLogger(__name__)
  16. class EventsState(State):
  17. # EventsState object is created and accessed only from ioloop thread
  18. def __init__(self, *args, **kwargs):
  19. super(EventsState, self).__init__(*args, **kwargs)
  20. def event(self, event):
  21. # Send event to api subscribers (via websockets)
  22. classname = api.events.getClassName(event['type'])
  23. cls = getattr(api.events, classname, None)
  24. if cls:
  25. cls.send_message(event)
  26. # Save the event
  27. super(EventsState, self).event(event)
  28. class Events(threading.Thread):
  29. def __init__(self, celery_app, db=None, persistent=False,
  30. io_loop=None, **kwargs):
  31. threading.Thread.__init__(self)
  32. self.daemon = True
  33. self._io_loop = io_loop or IOLoop.instance()
  34. self._celery_app = celery_app
  35. self._db = db
  36. self._persistent = persistent
  37. self.state = None
  38. if self._persistent and celery.__version__ < '3.0.15':
  39. logger.warning('Persistent mode is available with '
  40. 'Celery 3.0.15 and later')
  41. self._persistent = False
  42. if self._persistent:
  43. logger.debug("Loading state from '%s'...", db)
  44. state = shelve.open(self._db)
  45. if state:
  46. self.state = state['events']
  47. state.close()
  48. if not self.state:
  49. self.state = EventsState(**kwargs)
  50. self._timer = PeriodicCallback(self.on_enable_events,
  51. CELERY_EVENTS_ENABLE_INTERVAL)
  52. def start(self):
  53. threading.Thread.start(self)
  54. # Celery versions prior to 3 don't support enable_events
  55. if celery.VERSION[0] > 2:
  56. self._timer.start()
  57. def stop(self):
  58. if self._persistent:
  59. logger.debug("Saving state to '%s'...", self._db)
  60. state = shelve.open(self._db)
  61. state['events'] = self.state
  62. state.close()
  63. def run(self):
  64. try_interval = 1
  65. while True:
  66. try:
  67. try_interval *= 2
  68. with self._celery_app.connection() as conn:
  69. recv = EventReceiver(conn,
  70. handlers={"*": self.on_event},
  71. app=self._celery_app)
  72. recv.capture(limit=None, timeout=None)
  73. try_interval = 1
  74. except (KeyboardInterrupt, SystemExit):
  75. try:
  76. import _thread as thread
  77. except ImportError:
  78. import thread
  79. thread.interrupt_main()
  80. except Exception as e:
  81. logger.error("Failed to capture events: '%s', "
  82. "trying again in %s seconds.",
  83. e, try_interval)
  84. logger.debug(e, exc_info=True)
  85. time.sleep(try_interval)
  86. def on_enable_events(self):
  87. # Periodically enable events for workers
  88. # launched after flower
  89. logger.debug('Enabling events')
  90. try:
  91. self._celery_app.control.enable_events()
  92. except Exception as e:
  93. logger.debug("Failed to enable events: '%s'", e)
  94. def on_event(self, event):
  95. # Call EventsState.event in ioloop thread to avoid synchronization
  96. self._io_loop.add_callback(partial(self.state.event, event))