consumer.py 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. import datetime
  2. import logging
  3. import os
  4. import signal
  5. import sys
  6. import threading
  7. import time
  8. from multiprocessing import Event as ProcessEvent
  9. from multiprocessing import Process
  10. try:
  11. import gevent
  12. from gevent import Greenlet
  13. from gevent.event import Event as GreenEvent
  14. except ImportError:
  15. Greenlet = GreenEvent = None
  16. from huey.constants import WORKER_GREENLET
  17. from huey.constants import WORKER_PROCESS
  18. from huey.constants import WORKER_THREAD
  19. from huey.constants import WORKER_TYPES
  20. from huey.exceptions import CancelExecution
  21. from huey.exceptions import ConfigurationError
  22. from huey.exceptions import DataStoreGetException
  23. from huey.exceptions import QueueException
  24. from huey.exceptions import QueueReadException
  25. from huey.exceptions import DataStorePutException
  26. from huey.exceptions import QueueWriteException
  27. from huey.exceptions import RetryTask
  28. from huey.exceptions import ScheduleAddException
  29. from huey.exceptions import ScheduleReadException
  30. from huey.exceptions import TaskLockedException
  31. EVENT_CHECKING_PERIODIC = 'checking-periodic'
  32. EVENT_ERROR_DEQUEUEING = 'error-dequeueing'
  33. EVENT_ERROR_ENQUEUEING = 'error-enqueueing'
  34. EVENT_ERROR_INTERNAL = 'error-internal'
  35. EVENT_ERROR_SCHEDULING = 'error-scheduling'
  36. EVENT_ERROR_STORING_RESULT = 'error-storing-result'
  37. EVENT_ERROR_TASK = 'error-task'
  38. EVENT_LOCKED = 'locked'
  39. EVENT_FINISHED = 'finished'
  40. EVENT_RETRYING = 'retrying'
  41. EVENT_REVOKED = 'revoked'
  42. EVENT_SCHEDULED = 'scheduled'
  43. EVENT_SCHEDULING_PERIODIC = 'scheduling-periodic'
  44. EVENT_STARTED = 'started'
  45. EVENT_TIMEOUT = 'timeout'
  46. def to_timestamp(dt):
  47. if dt:
  48. return time.mktime(dt.timetuple())
  49. class BaseProcess(object):
  50. """
  51. Abstract process run by the consumer. Provides convenience methods for
  52. things like sleeping for a given amount of time and enqueueing tasks.
  53. Subclasses should implement the `loop()` method, which is called repeatedly
  54. until the consumer is shutdown. The `loop()` method's return value is
  55. ignored, but an unhandled exception will lead to the process shutting down.
  56. A typical pattern might be::
  57. class CustomProcess(BaseProcess):
  58. def loop(self, now=None):
  59. # Get the current timestamp.
  60. current_ts = time.time()
  61. # Perform some action, which may take an arbitrary amount of
  62. # time.
  63. do_some_action()
  64. # Sleep for 60 seconds, with respect to current_ts, so that
  65. # the whole loop() method repeats every ~60s.
  66. self.sleep_for_interval(current_ts, 60)
  67. You will want to ensure that the consumer starts your custom process::
  68. class MyConsumer(Consumer):
  69. def start(self):
  70. # Initialize workers, scheduler, signal handlers, etc.
  71. super(MyConsumer, self).start()
  72. # Create custom process and start it.
  73. custom_impl = CustomProcess(huey=self.huey, utc=self.utc)
  74. self._custom_proc = self._create_process(custom_impl, 'Custom')
  75. self._custom_proc.start()
  76. See also: Consumer._create_process().
  77. """
  78. def __init__(self, huey, utc):
  79. self.huey = huey
  80. self.utc = utc
  81. def initialize(self):
  82. pass
  83. def get_now(self):
  84. if self.utc:
  85. return datetime.datetime.utcnow()
  86. return datetime.datetime.now()
  87. def get_utcnow(self):
  88. return datetime.datetime.utcnow()
  89. def get_timestamp(self):
  90. return time.mktime(self.get_utcnow().timetuple())
  91. def sleep_for_interval(self, start_ts, nseconds):
  92. """
  93. Sleep for a given interval with respect to the start timestamp.
  94. So, if the start timestamp is 1337 and nseconds is 10, the method will
  95. actually sleep for nseconds - (current_timestamp - start_timestamp). So
  96. if the current timestamp is 1340, we'll only sleep for 7 seconds (the
  97. goal being to sleep until 1347, or 1337 + 10).
  98. """
  99. sleep_time = nseconds - (time.time() - start_ts)
  100. if sleep_time <= 0:
  101. return
  102. self._logger.debug('Sleeping for %s', sleep_time)
  103. # Recompute time to sleep to improve accuracy in case the process was
  104. # pre-empted by the kernel while logging.
  105. sleep_time = nseconds - (time.time() - start_ts)
  106. if sleep_time > 0:
  107. time.sleep(sleep_time)
  108. def enqueue(self, task):
  109. """
  110. Convenience method for enqueueing a task.
  111. """
  112. try:
  113. self.huey.enqueue(task)
  114. except QueueWriteException:
  115. self.huey.emit_task(EVENT_ERROR_ENQUEUEING, task, error=True)
  116. self._logger.exception('Error enqueueing task: %s', task)
  117. else:
  118. self._logger.debug('Enqueued task: %s', task)
  119. def loop(self, now=None):
  120. """
  121. Process-specific implementation. Called repeatedly for as long as the
  122. consumer is running. The `now` parameter is currently only used in the
  123. unit-tests (to avoid monkey-patching datetime / time). Return value is
  124. ignored, but an unhandled exception will lead to the process exiting.
  125. """
  126. raise NotImplementedError
  127. class Worker(BaseProcess):
  128. """
  129. Worker implementation.
  130. Will pull tasks from the queue, executing them or adding them to the
  131. schedule if they are set to run in the future.
  132. """
  133. def __init__(self, huey, default_delay, max_delay, backoff, utc):
  134. self.delay = self.default_delay = default_delay
  135. self.max_delay = max_delay
  136. self.backoff = backoff
  137. self._logger = logging.getLogger('huey.consumer.Worker')
  138. self._pre_execute = huey.pre_execute_hooks.items()
  139. self._post_execute = huey.post_execute_hooks.items()
  140. super(Worker, self).__init__(huey, utc)
  141. def initialize(self):
  142. for name, startup_hook in self.huey.startup_hooks.items():
  143. self._logger.debug('calling startup hook "%s"', name)
  144. try:
  145. startup_hook()
  146. except Exception as exc:
  147. self._logger.exception('startup hook "%s" failed', name)
  148. def loop(self, now=None):
  149. task = None
  150. exc_raised = True
  151. try:
  152. task = self.huey.dequeue()
  153. except QueueReadException:
  154. self.huey.emit_status(EVENT_ERROR_DEQUEUEING, error=True)
  155. self._logger.exception('Error reading from queue')
  156. except QueueException:
  157. self.huey.emit_status(EVENT_ERROR_INTERNAL, error=True)
  158. self._logger.exception('Queue exception')
  159. except KeyboardInterrupt:
  160. raise
  161. except:
  162. self.huey.emit_status(EVENT_ERROR_DEQUEUEING, error=True)
  163. self._logger.exception('Unknown exception dequeueing task.')
  164. else:
  165. exc_raised = False
  166. if task:
  167. self.delay = self.default_delay
  168. self.handle_task(task, now or self.get_now())
  169. elif exc_raised or not self.huey.blocking:
  170. self.sleep()
  171. def sleep(self):
  172. if self.delay > self.max_delay:
  173. self.delay = self.max_delay
  174. self._logger.debug('No messages, sleeping for: %s', self.delay)
  175. time.sleep(self.delay)
  176. self.delay *= self.backoff
  177. def handle_task(self, task, ts):
  178. """
  179. Handle a task that was just read from the queue. There are three
  180. possible outcomes:
  181. 1. Task is scheduled for the future, add to the schedule.
  182. 2. Task is ready to run, but has been revoked. Discard.
  183. 3. Task is ready to run and not revoked. Execute task.
  184. """
  185. if not self.huey.ready_to_run(task, ts):
  186. self.add_schedule(task)
  187. elif not self.is_revoked(task, ts):
  188. self.process_task(task, ts)
  189. else:
  190. self.huey.emit_task(
  191. EVENT_REVOKED,
  192. task,
  193. timestamp=to_timestamp(ts))
  194. self._logger.debug('Task %s was revoked, not running', task)
  195. def process_task(self, task, ts):
  196. """
  197. Execute a task and (optionally) store the return value in result store.
  198. Unhandled exceptions are caught and logged.
  199. """
  200. self.huey.emit_task(EVENT_STARTED, task, timestamp=to_timestamp(ts))
  201. if self._pre_execute:
  202. try:
  203. self.run_pre_execute_hooks(task)
  204. except CancelExecution:
  205. return
  206. self._logger.info('Executing %s', task)
  207. start = time.time()
  208. exception = None
  209. task_value = None
  210. try:
  211. try:
  212. task_value = self.huey.execute(task)
  213. finally:
  214. duration = time.time() - start
  215. except DataStorePutException:
  216. self._logger.exception('Error storing result')
  217. self.huey.emit_task(
  218. EVENT_ERROR_STORING_RESULT,
  219. task,
  220. error=True,
  221. duration=duration)
  222. except TaskLockedException as exc:
  223. self._logger.warning('Task %s could not run, unable to obtain '
  224. 'lock.', task.task_id)
  225. self.huey.emit_task(
  226. EVENT_LOCKED,
  227. task,
  228. error=False,
  229. duration=duration)
  230. exception = exc
  231. except RetryTask:
  232. if not task.retries:
  233. self._logger.error('Cannot retry task %s - no retries '
  234. 'remaining.', task.task_id)
  235. exception = True
  236. except KeyboardInterrupt:
  237. self._logger.info('Received exit signal, task %s did not finish.',
  238. task.task_id)
  239. return
  240. except Exception as exc:
  241. self._logger.exception('Unhandled exception in worker thread')
  242. self.huey.emit_task(
  243. EVENT_ERROR_TASK,
  244. task,
  245. error=True,
  246. duration=duration)
  247. exception = exc
  248. else:
  249. self._logger.info('Executed %s in %0.3fs', task, duration)
  250. self.huey.emit_task(
  251. EVENT_FINISHED,
  252. task,
  253. duration=duration,
  254. timestamp=self.get_timestamp())
  255. if self._post_execute:
  256. self.run_post_execute_hooks(task, task_value, exception)
  257. if exception is not None and task.retries:
  258. self.requeue_task(task, self.get_now())
  259. def run_pre_execute_hooks(self, task):
  260. self._logger.info('Running pre-execute hooks for %s', task)
  261. for name, callback in self._pre_execute:
  262. self._logger.debug('Executing %s pre-execute hook.', name)
  263. try:
  264. callback(task)
  265. except CancelExecution:
  266. self._logger.info('Execution of %s cancelled by %s.', task,
  267. name)
  268. raise
  269. except Exception:
  270. self._logger.exception('Unhandled exception calling pre-'
  271. 'execute hook %s for %s.', name, task)
  272. def run_post_execute_hooks(self, task, task_value, exception):
  273. self._logger.info('Running post-execute hooks for %s', task)
  274. for name, callback in self._post_execute:
  275. self._logger.debug('Executing %s post-execute hook.', name)
  276. try:
  277. callback(task, task_value, exception)
  278. except Exception as exc:
  279. self._logger.exception('Unhandled exception calling post-'
  280. 'execute hook %s for %s.', name, task)
  281. def requeue_task(self, task, ts):
  282. task.retries -= 1
  283. self.huey.emit_task(EVENT_RETRYING, task)
  284. self._logger.info('Re-enqueueing task %s, %s tries left',
  285. task.task_id, task.retries)
  286. if task.retry_delay:
  287. delay = datetime.timedelta(seconds=task.retry_delay)
  288. task.execute_time = ts + delay
  289. self.add_schedule(task)
  290. else:
  291. self.enqueue(task)
  292. def add_schedule(self, task):
  293. self._logger.info('Adding %s to schedule', task)
  294. try:
  295. self.huey.add_schedule(task)
  296. except ScheduleAddException:
  297. self.huey.emit_task(EVENT_ERROR_SCHEDULING, task, error=True)
  298. self._logger.error('Error adding task to schedule: %s', task)
  299. else:
  300. self.huey.emit_task(EVENT_SCHEDULED, task)
  301. def is_revoked(self, task, ts):
  302. try:
  303. if self.huey.is_revoked(task, ts, peek=False):
  304. return True
  305. return False
  306. except DataStoreGetException:
  307. self.huey.emit_task(EVENT_ERROR_INTERNAL, task, error=True)
  308. self._logger.error('Error checking if task is revoked: %s', task)
  309. return True
  310. class Scheduler(BaseProcess):
  311. """
  312. Scheduler handles enqueueing tasks when they are scheduled to execute. Note
  313. that the scheduler does not actually execute any tasks, but simply enqueues
  314. them so that they can be picked up by the worker processes.
  315. If periodic tasks are enabled, the scheduler will wake up every 60 seconds
  316. to enqueue any periodic tasks that should be run.
  317. """
  318. def __init__(self, huey, interval, utc, periodic):
  319. super(Scheduler, self).__init__(huey, utc)
  320. self.interval = min(interval, 60)
  321. self.periodic = periodic
  322. if periodic:
  323. # Determine the periodic task interval.
  324. self._counter = 0
  325. self._q, self._r = divmod(60, self.interval)
  326. self._cr = self._r
  327. self._logger = logging.getLogger('huey.consumer.Scheduler')
  328. self._next_loop = time.time()
  329. def loop(self, now=None):
  330. current = self._next_loop
  331. self._next_loop += self.interval
  332. if self._next_loop < time.time():
  333. self._logger.info('scheduler skipping iteration to avoid race.')
  334. return
  335. try:
  336. task_list = self.huey.read_schedule(now or self.get_now())
  337. except ScheduleReadException:
  338. #self.huey.emit_task(EVENT_ERROR_SCHEDULING, task, error=True)
  339. self._logger.exception('Error reading from task schedule.')
  340. else:
  341. for task in task_list:
  342. self._logger.info('Scheduling %s for execution', task)
  343. self.enqueue(task)
  344. if self.periodic:
  345. # The scheduler has an interesting property of being able to run at
  346. # intervals that are not factors of 60. Suppose we ask our
  347. # scheduler to run every 45 seconds. We still want to schedule
  348. # periodic tasks once per minute, however. So we use a running
  349. # remainder to ensure that no matter what interval the scheduler is
  350. # running at, we still are enqueueing tasks once per minute at the
  351. # same time.
  352. if self._counter >= self._q:
  353. self._counter = 0
  354. if self._cr:
  355. self.sleep_for_interval(current, self._cr)
  356. if self._r:
  357. self._cr += self._r
  358. if self._cr >= self.interval:
  359. self._cr -= self.interval
  360. self._counter -= 1
  361. self.enqueue_periodic_tasks(now or self.get_now(), current)
  362. self._counter += 1
  363. self.sleep_for_interval(current, self.interval)
  364. def enqueue_periodic_tasks(self, now, start):
  365. self.huey.emit_status(
  366. EVENT_CHECKING_PERIODIC,
  367. timestamp=self.get_timestamp())
  368. self._logger.debug('Checking periodic tasks')
  369. for task in self.huey.read_periodic(now):
  370. self.huey.emit_task(
  371. EVENT_SCHEDULING_PERIODIC,
  372. task,
  373. timestamp=self.get_timestamp())
  374. self._logger.info('Scheduling periodic task %s.', task)
  375. self.enqueue(task)
  376. return True
  377. class Environment(object):
  378. """
  379. Provide a common interface to the supported concurrent environments.
  380. """
  381. def get_stop_flag(self):
  382. raise NotImplementedError
  383. def create_process(self, runnable, name):
  384. raise NotImplementedError
  385. def is_alive(self, proc):
  386. raise NotImplementedError
  387. class ThreadEnvironment(Environment):
  388. def get_stop_flag(self):
  389. return threading.Event()
  390. def create_process(self, runnable, name):
  391. t = threading.Thread(target=runnable, name=name)
  392. t.daemon = True
  393. return t
  394. def is_alive(self, proc):
  395. return proc.isAlive()
  396. class GreenletEnvironment(Environment):
  397. def get_stop_flag(self):
  398. return GreenEvent()
  399. def create_process(self, runnable, name):
  400. def run_wrapper():
  401. gevent.sleep()
  402. runnable()
  403. gevent.sleep()
  404. return Greenlet(run=run_wrapper)
  405. def is_alive(self, proc):
  406. return not proc.dead
  407. class ProcessEnvironment(Environment):
  408. def get_stop_flag(self):
  409. return ProcessEvent()
  410. def create_process(self, runnable, name):
  411. p = Process(target=runnable, name=name)
  412. p.daemon = True
  413. return p
  414. def is_alive(self, proc):
  415. return proc.is_alive()
  416. WORKER_TO_ENVIRONMENT = {
  417. WORKER_THREAD: ThreadEnvironment,
  418. WORKER_GREENLET: GreenletEnvironment,
  419. 'gevent': GreenletEnvironment, # Preserved for backwards-compat.
  420. WORKER_PROCESS: ProcessEnvironment,
  421. }
  422. class Consumer(object):
  423. """
  424. Consumer sets up and coordinates the execution of the workers and scheduler
  425. and registers signal handlers.
  426. """
  427. def __init__(self, huey, workers=1, periodic=True, initial_delay=0.1,
  428. backoff=1.15, max_delay=10.0, utc=True, scheduler_interval=1,
  429. worker_type='thread', check_worker_health=True,
  430. health_check_interval=1, flush_locks=False):
  431. self._logger = logging.getLogger('huey.consumer')
  432. if huey.always_eager:
  433. self._logger.warning('Consumer initialized with Huey instance '
  434. 'that has "always_eager" mode enabled. This '
  435. 'must be disabled before the consumer can '
  436. 'be run.')
  437. self.huey = huey
  438. self.workers = workers # Number of workers.
  439. self.periodic = periodic # Enable periodic task scheduler?
  440. self.default_delay = initial_delay # Default queue polling interval.
  441. self.backoff = backoff # Exponential backoff factor when queue empty.
  442. self.max_delay = max_delay # Maximum interval between polling events.
  443. self.utc = utc # Timestamps are considered UTC.
  444. # Ensure that the scheduler runs at an interval between 1 and 60s.
  445. self.scheduler_interval = max(min(scheduler_interval, 60), 1)
  446. self.worker_type = worker_type # What process model are we using?
  447. # Configure health-check and consumer main-loop attributes.
  448. self._stop_flag_timeout = 0.1
  449. self._health_check = check_worker_health
  450. self._health_check_interval = float(health_check_interval)
  451. # Create the execution environment helper.
  452. self.environment = self.get_environment(self.worker_type)
  453. # Create the event used to signal the process should terminate. We'll
  454. # also store a boolean flag to indicate whether we should restart after
  455. # the processes are cleaned up.
  456. self._received_signal = False
  457. self._restart = False
  458. self._graceful = True
  459. self.stop_flag = self.environment.get_stop_flag()
  460. # In the event the consumer was killed while running a task that held
  461. # a lock, this ensures that all locks are flushed before starting.
  462. if flush_locks:
  463. self.flush_locks()
  464. # Create the scheduler process (but don't start it yet).
  465. scheduler = self._create_scheduler()
  466. self.scheduler = self._create_process(scheduler, 'Scheduler')
  467. # Create the worker process(es) (also not started yet).
  468. self.worker_threads = []
  469. for i in range(workers):
  470. worker = self._create_worker()
  471. process = self._create_process(worker, 'Worker-%d' % (i + 1))
  472. # The worker threads are stored as [(worker impl, worker_t), ...].
  473. # The worker impl is not currently referenced in any consumer code,
  474. # but it is referenced in the test-suite.
  475. self.worker_threads.append((worker, process))
  476. def flush_locks(self):
  477. self._logger.debug('Flushing locks before starting up.')
  478. flushed = self.huey.flush_locks()
  479. if flushed:
  480. self._logger.warning('Found stale locks: %s' % (
  481. ', '.join(key for key in flushed)))
  482. def get_environment(self, worker_type):
  483. if worker_type not in WORKER_TO_ENVIRONMENT:
  484. raise ValueError('worker_type must be one of %s.' %
  485. ', '.join(WORKER_TYPES))
  486. return WORKER_TO_ENVIRONMENT[worker_type]()
  487. def _create_worker(self):
  488. return Worker(
  489. huey=self.huey,
  490. default_delay=self.default_delay,
  491. max_delay=self.max_delay,
  492. backoff=self.backoff,
  493. utc=self.utc)
  494. def _create_scheduler(self):
  495. return Scheduler(
  496. huey=self.huey,
  497. interval=self.scheduler_interval,
  498. utc=self.utc,
  499. periodic=self.periodic)
  500. def _create_process(self, process, name):
  501. """
  502. Repeatedly call the `loop()` method of the given process. Unhandled
  503. exceptions in the `loop()` method will cause the process to terminate.
  504. """
  505. def _run():
  506. process.initialize()
  507. try:
  508. while not self.stop_flag.is_set():
  509. process.loop()
  510. except KeyboardInterrupt:
  511. pass
  512. except:
  513. self._logger.exception('Process %s died!', name)
  514. return self.environment.create_process(_run, name)
  515. def start(self):
  516. """
  517. Start all consumer processes and register signal handlers.
  518. """
  519. if self.huey.always_eager:
  520. raise ConfigurationError(
  521. 'Consumer cannot be run with Huey instances where always_eager'
  522. ' is enabled. Please check your configuration and ensure that'
  523. ' "huey.always_eager = False".')
  524. # Log startup message.
  525. self._logger.info('Huey consumer started with %s %s, PID %s',
  526. self.workers, self.worker_type, os.getpid())
  527. self._logger.info('Scheduler runs every %s second(s).',
  528. self.scheduler_interval)
  529. self._logger.info('Periodic tasks are %s.',
  530. 'enabled' if self.periodic else 'disabled')
  531. self._logger.info('UTC is %s.', 'enabled' if self.utc else 'disabled')
  532. self._set_signal_handlers()
  533. msg = ['The following commands are available:']
  534. for command in self.huey.registry._registry:
  535. msg.append('+ %s' % command.replace('queuecmd_', ''))
  536. self._logger.info('\n'.join(msg))
  537. # We'll temporarily ignore SIGINT and SIGHUP (so that it is inherited
  538. # by the child-processes). Once the child processes are created, we
  539. # restore the handler.
  540. original_sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
  541. if hasattr(signal, 'SIGHUP'):
  542. original_sighup_handler = signal.signal(signal.SIGHUP, signal.SIG_IGN)
  543. self.scheduler.start()
  544. for _, worker_process in self.worker_threads:
  545. worker_process.start()
  546. signal.signal(signal.SIGINT, original_sigint_handler)
  547. if hasattr(signal, 'SIGHUP'):
  548. signal.signal(signal.SIGHUP, original_sighup_handler)
  549. def stop(self, graceful=False):
  550. """
  551. Set the stop-flag.
  552. If `graceful=True`, this method blocks until the workers to finish
  553. executing any tasks they might be currently working on.
  554. """
  555. self.stop_flag.set()
  556. if graceful:
  557. self._logger.info('Shutting down gracefully...')
  558. try:
  559. for _, worker_process in self.worker_threads:
  560. worker_process.join()
  561. except KeyboardInterrupt:
  562. self._logger.info('Received request to shut down now.')
  563. else:
  564. self._logger.info('All workers have stopped.')
  565. else:
  566. self._logger.info('Shutting down')
  567. def run(self):
  568. """
  569. Run the consumer.
  570. """
  571. self.start()
  572. timeout = self._stop_flag_timeout
  573. health_check_ts = time.time()
  574. while True:
  575. try:
  576. self.stop_flag.wait(timeout=timeout)
  577. except KeyboardInterrupt:
  578. self._logger.info('Received SIGINT')
  579. self.stop(graceful=True)
  580. except:
  581. self._logger.exception('Error in consumer.')
  582. self.stop()
  583. else:
  584. if self._received_signal:
  585. self.stop(graceful=self._graceful)
  586. if self.stop_flag.is_set():
  587. break
  588. if self._health_check:
  589. now = time.time()
  590. if now >= health_check_ts + self._health_check_interval:
  591. health_check_ts = now
  592. self.check_worker_health()
  593. if self._restart:
  594. self._logger.info('Consumer will restart.')
  595. python = sys.executable
  596. os.execl(python, python, *sys.argv)
  597. else:
  598. self._logger.info('Consumer exiting.')
  599. def check_worker_health(self):
  600. """
  601. Check the health of the worker processes. Workers that have died will
  602. be replaced with new workers.
  603. """
  604. self._logger.debug('Checking worker health.')
  605. workers = []
  606. restart_occurred = False
  607. for i, (worker, worker_t) in enumerate(self.worker_threads):
  608. if not self.environment.is_alive(worker_t):
  609. self._logger.warning('Worker %d died, restarting.', i + 1)
  610. worker = self._create_worker()
  611. worker_t = self._create_process(worker, 'Worker-%d' % (i + 1))
  612. worker_t.start()
  613. restart_occurred = True
  614. workers.append((worker, worker_t))
  615. if restart_occurred:
  616. self.worker_threads = workers
  617. else:
  618. self._logger.debug('Workers are up and running.')
  619. if not self.environment.is_alive(self.scheduler):
  620. self._logger.warning('Scheduler died, restarting.')
  621. scheduler = self._create_scheduler()
  622. self.scheduler = self._create_process(scheduler, 'Scheduler')
  623. self.scheduler.start()
  624. else:
  625. self._logger.debug('Scheduler is up and running.')
  626. return not restart_occurred
  627. def _set_signal_handlers(self):
  628. signal.signal(signal.SIGTERM, self._handle_stop_signal)
  629. signal.signal(signal.SIGINT, signal.default_int_handler)
  630. if hasattr(signal, 'SIGHUP'):
  631. signal.signal(signal.SIGHUP, self._handle_restart_signal)
  632. def _handle_stop_signal(self, sig_num, frame):
  633. self._logger.info('Received SIGTERM')
  634. self._received_signal = True
  635. self._restart = False
  636. self._graceful = False
  637. def _handle_restart_signal(self, sig_num, frame):
  638. self._logger.info('Received SIGHUP, will restart')
  639. self._received_signal = True
  640. self._restart = True