123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166 |
- import logging
- import optparse
- from collections import namedtuple
- from logging import FileHandler
- from huey.constants import WORKER_THREAD
- from huey.constants import WORKER_TYPES
- config_defaults = (
- ('workers', 1),
- ('worker_type', WORKER_THREAD),
- ('initial_delay', 0.1),
- ('backoff', 1.15),
- ('max_delay', 10.0),
- ('check_worker_health', True),
- ('health_check_interval', 1),
- ('scheduler_interval', 1),
- ('periodic', True),
- ('utc', True),
- ('logfile', None),
- ('verbose', None),
- ('flush_locks', False),
- )
- config_keys = [param for param, _ in config_defaults]
- def option(name, **options):
- if isinstance(name, tuple):
- letter, opt_name = name
- else:
- opt_name = name.replace('_', '-')
- letter = name[0]
- options.setdefault('dest', name)
- return ('-' + letter, '--' + opt_name, options)
- class OptionParserHandler(object):
- def get_worker_options(self):
- return (
- # -w, -k, -d, -m, -b, -c, -C, -f
- option('workers', type='int',
- help='number of worker threads/processes (default=1)'),
- option(('k', 'worker-type'), choices=WORKER_TYPES,
- dest='worker_type',
- help=('worker execution model (thread, greenlet, '
- 'process). Use process for CPU-intensive workloads, '
- 'and greenlet for IO-heavy workloads. When in doubt, '
- 'thread is the safest choice.')),
- option('delay', dest='initial_delay',
- help='minimum time to wait when polling queue (default=.1)',
- metavar='SECONDS', type='float'),
- option('max_delay', metavar='SECONDS',
- help='maximum time to wait when polling queue (default=10)',
- type='float'),
- option('backoff', metavar='SECONDS',
- help=('factor used to back-off polling interval when queue '
- 'is empty (default=1.15, must be >= 1)'),
- type='float'),
- option(('c', 'health_check_interval'), type='float',
- dest='health_check_interval', metavar='SECONDS',
- help=('minimum time to wait between worker health checks '
- '(default=1.0)')),
- option(('C', 'disable_health_check'), action='store_false',
- dest='check_worker_health',
- help=('disable health check that monitors worker health, '
- 'restarting any worker that crashes unexpectedly.')),
- option('flush_locks', action='store_true', dest='flush_locks',
- help=('flush all locks when starting consumer.')),
- )
- def get_scheduler_options(self):
- return (
- # -s, -n, -u, -o
- option('scheduler_interval', type='int',
- help='Granularity of scheduler in seconds.'),
- option('no_periodic', action='store_false',
- dest='periodic', help='do NOT enqueue periodic tasks'),
- option('utc', action='store_true',
- help='use UTC time for all tasks (default=True)'),
- option(('o', 'localtime'), action='store_false', dest='utc',
- help='use local time for all tasks'),
- )
- def get_logging_options(self):
- return (
- # -l, -v, -q
- option('logfile', metavar='FILE'),
- option('verbose', action='store_true',
- help='verbose logging (includes DEBUG statements)'),
- option('quiet', action='store_false', dest='verbose',
- help='minimal logging (only exceptions/errors)'),
- )
- def get_option_parser(self):
- parser = optparse.OptionParser('Usage: %prog [options] '
- 'path.to.huey_instance')
- def add_group(name, description, options):
- group = parser.add_option_group(name, description)
- for abbrev, name, kwargs in options:
- group.add_option(abbrev, name, **kwargs)
- add_group('Logging', 'The following options pertain to logging.',
- self.get_logging_options())
- add_group('Workers', (
- 'By default huey uses a single worker thread. To specify a '
- 'different number of workers, or a different execution model (such'
- ' as multiple processes or greenlets), use the options below.'),
- self.get_worker_options())
- add_group('Scheduler', (
- 'By default Huey will run the scheduler once every second to check'
- ' for tasks scheduled in the future, or tasks set to run at '
- 'specfic intervals (periodic tasks). Use the options below to '
- 'configure the scheduler or to disable periodic task scheduling.'),
- self.get_scheduler_options())
- return parser
- class ConsumerConfig(namedtuple('_ConsumerConfig', config_keys)):
- def __new__(cls, **kwargs):
- config = dict(config_defaults)
- config.update(kwargs)
- args = [config[key] for key in config_keys]
- return super(ConsumerConfig, cls).__new__(cls, *args)
- def validate(self):
- if self.backoff < 1:
- raise ValueError('The backoff must be greater than 1.')
- if not (0 < self.scheduler_interval <= 60):
- raise ValueError('The scheduler must run at least once per '
- 'minute, and at most once per second (1-60).')
- @property
- def loglevel(self):
- if self.verbose is None:
- return logging.INFO
- return logging.DEBUG if self.verbose else logging.ERROR
- def setup_logger(self, logger=None):
- if self.worker_type == 'process':
- worker = '%(process)d'
- else:
- worker = '%(threadName)s'
- logformat = ('[%(asctime)s] %(levelname)s:%(name)s:' + worker +
- ':%(message)s')
- loglevel = self.loglevel
- if logger is None:
- logging.basicConfig(level=loglevel, format=logformat)
- logger = logging.getLogger()
- else:
- logger.setLevel(loglevel)
- if self.logfile:
- handler = FileHandler(self.logfile)
- handler.setFormatter(logging.Formatter(logformat))
- logger.addHandler(handler)
- @property
- def values(self):
- return dict((key, getattr(self, key)) for key in config_keys
- if key not in ('logfile', 'verbose'))
|