heartbeat.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. # -*- coding: utf-8 -*-
  2. """
  3. celery.worker.heartbeat
  4. ~~~~~~~~~~~~~~~~~~~~~~~
  5. This is the internal thread that sends heartbeat events
  6. at regular intervals.
  7. """
  8. from __future__ import absolute_import
  9. from celery.five import values
  10. from celery.utils.sysinfo import load_average
  11. from .state import SOFTWARE_INFO, active_requests, total_count
  12. __all__ = ['Heart']
  13. class Heart(object):
  14. """Timer sending heartbeats at regular intervals.
  15. :param timer: Timer instance.
  16. :param eventer: Event dispatcher used to send the event.
  17. :keyword interval: Time in seconds between heartbeats.
  18. Default is 30 seconds.
  19. """
  20. def __init__(self, timer, eventer, interval=None):
  21. self.timer = timer
  22. self.eventer = eventer
  23. self.interval = float(interval or 2.0)
  24. self.tref = None
  25. # Make event dispatcher start/stop us when enabled/disabled.
  26. self.eventer.on_enabled.add(self.start)
  27. self.eventer.on_disabled.add(self.stop)
  28. def _send(self, event):
  29. return self.eventer.send(event, freq=self.interval,
  30. active=len(active_requests),
  31. processed=sum(values(total_count)),
  32. loadavg=load_average(),
  33. **SOFTWARE_INFO)
  34. def start(self):
  35. if self.eventer.enabled:
  36. self._send('worker-online')
  37. self.tref = self.timer.call_repeatedly(
  38. self.interval, self._send, ('worker-heartbeat', ),
  39. )
  40. def stop(self):
  41. if self.tref is not None:
  42. self.timer.cancel(self.tref)
  43. self.tref = None
  44. if self.eventer.enabled:
  45. self._send('worker-offline')