loops.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """
  2. celery.worker.loop
  3. ~~~~~~~~~~~~~~~~~~
  4. The consumers highly-optimized inner loop.
  5. """
  6. from __future__ import absolute_import
  7. import socket
  8. from celery.bootsteps import RUN
  9. from celery.exceptions import SystemTerminate, WorkerLostError
  10. from celery.utils.log import get_logger
  11. from . import state
  12. __all__ = ['asynloop', 'synloop']
  13. logger = get_logger(__name__)
  14. error = logger.error
  15. def asynloop(obj, connection, consumer, blueprint, hub, qos,
  16. heartbeat, clock, hbrate=2.0, RUN=RUN):
  17. """Non-blocking event loop consuming messages until connection is lost,
  18. or shutdown is requested."""
  19. update_qos = qos.update
  20. readers, writers = hub.readers, hub.writers
  21. hbtick = connection.heartbeat_check
  22. errors = connection.connection_errors
  23. hub_add, hub_remove = hub.add, hub.remove
  24. on_task_received = obj.create_task_handler()
  25. if heartbeat and connection.supports_heartbeats:
  26. hub.call_repeatedly(heartbeat / hbrate, hbtick, hbrate)
  27. consumer.callbacks = [on_task_received]
  28. consumer.consume()
  29. obj.on_ready()
  30. obj.controller.register_with_event_loop(hub)
  31. obj.register_with_event_loop(hub)
  32. # did_start_ok will verify that pool processes were able to start,
  33. # but this will only work the first time we start, as
  34. # maxtasksperchild will mess up metrics.
  35. if not obj.restart_count and not obj.pool.did_start_ok():
  36. raise WorkerLostError('Could not start worker processes')
  37. # FIXME: Use loop.run_forever
  38. # Tried and works, but no time to test properly before release.
  39. hub.propagate_errors = errors
  40. loop = hub.create_loop()
  41. try:
  42. while blueprint.state == RUN and obj.connection:
  43. # shutdown if signal handlers told us to.
  44. if state.should_stop:
  45. raise SystemExit()
  46. elif state.should_terminate:
  47. raise SystemTerminate()
  48. # We only update QoS when there is no more messages to read.
  49. # This groups together qos calls, and makes sure that remote
  50. # control commands will be prioritized over task messages.
  51. if qos.prev != qos.value:
  52. update_qos()
  53. try:
  54. next(loop)
  55. except StopIteration:
  56. loop = hub.create_loop()
  57. finally:
  58. try:
  59. hub.close()
  60. except Exception as exc:
  61. error(
  62. 'Error cleaning up after event loop: %r', exc, exc_info=1,
  63. )
  64. def synloop(obj, connection, consumer, blueprint, hub, qos,
  65. heartbeat, clock, hbrate=2.0, **kwargs):
  66. """Fallback blocking event loop for transports that doesn't support AIO."""
  67. on_task_received = obj.create_task_handler()
  68. consumer.register_callback(on_task_received)
  69. consumer.consume()
  70. obj.on_ready()
  71. while blueprint.state == RUN and obj.connection:
  72. state.maybe_shutdown()
  73. if qos.prev != qos.value:
  74. qos.update()
  75. try:
  76. connection.drain_events(timeout=2.0)
  77. except socket.timeout:
  78. pass
  79. except socket.error:
  80. if blueprint.state == RUN:
  81. raise