123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- from __future__ import absolute_import
- import socket
- import threading
- from kombu.common import ignore_errors
- from kombu.utils.encoding import safe_str
- from celery.datastructures import AttributeDict
- from celery.utils.log import get_logger
- from . import control
- __all__ = ['Pidbox', 'gPidbox']
- logger = get_logger(__name__)
- debug, error, info = logger.debug, logger.error, logger.info
- class Pidbox(object):
- consumer = None
- def __init__(self, c):
- self.c = c
- self.hostname = c.hostname
- self.node = c.app.control.mailbox.Node(
- safe_str(c.hostname),
- handlers=control.Panel.data,
- state=AttributeDict(app=c.app, hostname=c.hostname, consumer=c),
- )
- self._forward_clock = self.c.app.clock.forward
- def on_message(self, body, message):
- self._forward_clock() # just increase clock as clients usually don't
- # have a valid clock to adjust with.
- try:
- self.node.handle_message(body, message)
- except KeyError as exc:
- error('No such control command: %s', exc)
- except Exception as exc:
- error('Control command error: %r', exc, exc_info=True)
- self.reset()
- def start(self, c):
- self.node.channel = c.connection.channel()
- self.consumer = self.node.listen(callback=self.on_message)
- def on_stop(self):
- pass
- def stop(self, c):
- self.on_stop()
- self.consumer = self._close_channel(c)
- def reset(self):
- """Sets up the process mailbox."""
- self.stop(self.c)
- self.start(self.c)
- def _close_channel(self, c):
- if self.node and self.node.channel:
- ignore_errors(c, self.node.channel.close)
- def shutdown(self, c):
- self.on_stop()
- if self.consumer:
- debug('Cancelling broadcast consumer...')
- ignore_errors(c, self.consumer.cancel)
- self.stop(self.c)
- class gPidbox(Pidbox):
- _node_shutdown = None
- _node_stopped = None
- _resets = 0
- def start(self, c):
- c.pool.spawn_n(self.loop, c)
- def on_stop(self):
- if self._node_stopped:
- self._node_shutdown.set()
- debug('Waiting for broadcast thread to shutdown...')
- self._node_stopped.wait()
- self._node_stopped = self._node_shutdown = None
- def reset(self):
- self._resets += 1
- def _do_reset(self, c, connection):
- self._close_channel(c)
- self.node.channel = connection.channel()
- self.consumer = self.node.listen(callback=self.on_message)
- self.consumer.consume()
- def loop(self, c):
- resets = [self._resets]
- shutdown = self._node_shutdown = threading.Event()
- stopped = self._node_stopped = threading.Event()
- try:
- with c.connect() as connection:
- info('pidbox: Connected to %s.', connection.as_uri())
- self._do_reset(c, connection)
- while not shutdown.is_set() and c.connection:
- if resets[0] < self._resets:
- resets[0] += 1
- self._do_reset(c, connection)
- try:
- connection.drain_events(timeout=1.0)
- except socket.timeout:
- pass
- finally:
- stopped.set()
|