consumer_options.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. import logging
  2. import optparse
  3. from collections import namedtuple
  4. from logging import FileHandler
  5. from huey.constants import WORKER_THREAD
  6. from huey.constants import WORKER_TYPES
  7. config_defaults = (
  8. ('workers', 1),
  9. ('worker_type', WORKER_THREAD),
  10. ('initial_delay', 0.1),
  11. ('backoff', 1.15),
  12. ('max_delay', 10.0),
  13. ('check_worker_health', True),
  14. ('health_check_interval', 1),
  15. ('scheduler_interval', 1),
  16. ('periodic', True),
  17. ('utc', True),
  18. ('logfile', None),
  19. ('verbose', None),
  20. ('flush_locks', False),
  21. )
  22. config_keys = [param for param, _ in config_defaults]
  23. def option(name, **options):
  24. if isinstance(name, tuple):
  25. letter, opt_name = name
  26. else:
  27. opt_name = name.replace('_', '-')
  28. letter = name[0]
  29. options.setdefault('dest', name)
  30. return ('-' + letter, '--' + opt_name, options)
  31. class OptionParserHandler(object):
  32. def get_worker_options(self):
  33. return (
  34. # -w, -k, -d, -m, -b, -c, -C, -f
  35. option('workers', type='int',
  36. help='number of worker threads/processes (default=1)'),
  37. option(('k', 'worker-type'), choices=WORKER_TYPES,
  38. dest='worker_type',
  39. help=('worker execution model (thread, greenlet, '
  40. 'process). Use process for CPU-intensive workloads, '
  41. 'and greenlet for IO-heavy workloads. When in doubt, '
  42. 'thread is the safest choice.')),
  43. option('delay', dest='initial_delay',
  44. help='minimum time to wait when polling queue (default=.1)',
  45. metavar='SECONDS', type='float'),
  46. option('max_delay', metavar='SECONDS',
  47. help='maximum time to wait when polling queue (default=10)',
  48. type='float'),
  49. option('backoff', metavar='SECONDS',
  50. help=('factor used to back-off polling interval when queue '
  51. 'is empty (default=1.15, must be >= 1)'),
  52. type='float'),
  53. option(('c', 'health_check_interval'), type='float',
  54. dest='health_check_interval', metavar='SECONDS',
  55. help=('minimum time to wait between worker health checks '
  56. '(default=1.0)')),
  57. option(('C', 'disable_health_check'), action='store_false',
  58. dest='check_worker_health',
  59. help=('disable health check that monitors worker health, '
  60. 'restarting any worker that crashes unexpectedly.')),
  61. option('flush_locks', action='store_true', dest='flush_locks',
  62. help=('flush all locks when starting consumer.')),
  63. )
  64. def get_scheduler_options(self):
  65. return (
  66. # -s, -n, -u, -o
  67. option('scheduler_interval', type='int',
  68. help='Granularity of scheduler in seconds.'),
  69. option('no_periodic', action='store_false',
  70. dest='periodic', help='do NOT enqueue periodic tasks'),
  71. option('utc', action='store_true',
  72. help='use UTC time for all tasks (default=True)'),
  73. option(('o', 'localtime'), action='store_false', dest='utc',
  74. help='use local time for all tasks'),
  75. )
  76. def get_logging_options(self):
  77. return (
  78. # -l, -v, -q
  79. option('logfile', metavar='FILE'),
  80. option('verbose', action='store_true',
  81. help='verbose logging (includes DEBUG statements)'),
  82. option('quiet', action='store_false', dest='verbose',
  83. help='minimal logging (only exceptions/errors)'),
  84. )
  85. def get_option_parser(self):
  86. parser = optparse.OptionParser('Usage: %prog [options] '
  87. 'path.to.huey_instance')
  88. def add_group(name, description, options):
  89. group = parser.add_option_group(name, description)
  90. for abbrev, name, kwargs in options:
  91. group.add_option(abbrev, name, **kwargs)
  92. add_group('Logging', 'The following options pertain to logging.',
  93. self.get_logging_options())
  94. add_group('Workers', (
  95. 'By default huey uses a single worker thread. To specify a '
  96. 'different number of workers, or a different execution model (such'
  97. ' as multiple processes or greenlets), use the options below.'),
  98. self.get_worker_options())
  99. add_group('Scheduler', (
  100. 'By default Huey will run the scheduler once every second to check'
  101. ' for tasks scheduled in the future, or tasks set to run at '
  102. 'specfic intervals (periodic tasks). Use the options below to '
  103. 'configure the scheduler or to disable periodic task scheduling.'),
  104. self.get_scheduler_options())
  105. return parser
  106. class ConsumerConfig(namedtuple('_ConsumerConfig', config_keys)):
  107. def __new__(cls, **kwargs):
  108. config = dict(config_defaults)
  109. config.update(kwargs)
  110. args = [config[key] for key in config_keys]
  111. return super(ConsumerConfig, cls).__new__(cls, *args)
  112. def validate(self):
  113. if self.backoff < 1:
  114. raise ValueError('The backoff must be greater than 1.')
  115. if not (0 < self.scheduler_interval <= 60):
  116. raise ValueError('The scheduler must run at least once per '
  117. 'minute, and at most once per second (1-60).')
  118. @property
  119. def loglevel(self):
  120. if self.verbose is None:
  121. return logging.INFO
  122. return logging.DEBUG if self.verbose else logging.ERROR
  123. def setup_logger(self, logger=None):
  124. if self.worker_type == 'process':
  125. worker = '%(process)d'
  126. else:
  127. worker = '%(threadName)s'
  128. logformat = ('[%(asctime)s] %(levelname)s:%(name)s:' + worker +
  129. ':%(message)s')
  130. loglevel = self.loglevel
  131. if logger is None:
  132. logging.basicConfig(level=loglevel, format=logformat)
  133. logger = logging.getLogger()
  134. else:
  135. logger.setLevel(loglevel)
  136. if self.logfile:
  137. handler = FileHandler(self.logfile)
  138. handler.setFormatter(logging.Formatter(logformat))
  139. logger.addHandler(handler)
  140. @property
  141. def values(self):
  142. return dict((key, getattr(self, key)) for key in config_keys
  143. if key not in ('logfile', 'verbose'))