pidbox.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. from __future__ import absolute_import
  2. import socket
  3. import threading
  4. from kombu.common import ignore_errors
  5. from kombu.utils.encoding import safe_str
  6. from celery.datastructures import AttributeDict
  7. from celery.utils.log import get_logger
  8. from . import control
  9. __all__ = ['Pidbox', 'gPidbox']
  10. logger = get_logger(__name__)
  11. debug, error, info = logger.debug, logger.error, logger.info
  12. class Pidbox(object):
  13. consumer = None
  14. def __init__(self, c):
  15. self.c = c
  16. self.hostname = c.hostname
  17. self.node = c.app.control.mailbox.Node(
  18. safe_str(c.hostname),
  19. handlers=control.Panel.data,
  20. state=AttributeDict(app=c.app, hostname=c.hostname, consumer=c),
  21. )
  22. self._forward_clock = self.c.app.clock.forward
  23. def on_message(self, body, message):
  24. self._forward_clock() # just increase clock as clients usually don't
  25. # have a valid clock to adjust with.
  26. try:
  27. self.node.handle_message(body, message)
  28. except KeyError as exc:
  29. error('No such control command: %s', exc)
  30. except Exception as exc:
  31. error('Control command error: %r', exc, exc_info=True)
  32. self.reset()
  33. def start(self, c):
  34. self.node.channel = c.connection.channel()
  35. self.consumer = self.node.listen(callback=self.on_message)
  36. def on_stop(self):
  37. pass
  38. def stop(self, c):
  39. self.on_stop()
  40. self.consumer = self._close_channel(c)
  41. def reset(self):
  42. """Sets up the process mailbox."""
  43. self.stop(self.c)
  44. self.start(self.c)
  45. def _close_channel(self, c):
  46. if self.node and self.node.channel:
  47. ignore_errors(c, self.node.channel.close)
  48. def shutdown(self, c):
  49. self.on_stop()
  50. if self.consumer:
  51. debug('Cancelling broadcast consumer...')
  52. ignore_errors(c, self.consumer.cancel)
  53. self.stop(self.c)
  54. class gPidbox(Pidbox):
  55. _node_shutdown = None
  56. _node_stopped = None
  57. _resets = 0
  58. def start(self, c):
  59. c.pool.spawn_n(self.loop, c)
  60. def on_stop(self):
  61. if self._node_stopped:
  62. self._node_shutdown.set()
  63. debug('Waiting for broadcast thread to shutdown...')
  64. self._node_stopped.wait()
  65. self._node_stopped = self._node_shutdown = None
  66. def reset(self):
  67. self._resets += 1
  68. def _do_reset(self, c, connection):
  69. self._close_channel(c)
  70. self.node.channel = connection.channel()
  71. self.consumer = self.node.listen(callback=self.on_message)
  72. self.consumer.consume()
  73. def loop(self, c):
  74. resets = [self._resets]
  75. shutdown = self._node_shutdown = threading.Event()
  76. stopped = self._node_stopped = threading.Event()
  77. try:
  78. with c.connect() as connection:
  79. info('pidbox: Connected to %s.', connection.as_uri())
  80. self._do_reset(c, connection)
  81. while not shutdown.is_set() and c.connection:
  82. if resets[0] < self._resets:
  83. resets[0] += 1
  84. self._do_reset(c, connection)
  85. try:
  86. connection.drain_events(timeout=1.0)
  87. except socket.timeout:
  88. pass
  89. finally:
  90. stopped.set()