123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987 |
- from __future__ import print_function
- from abc import ABCMeta, abstractmethod
- from collections import MutableMapping
- from threading import RLock
- from datetime import datetime
- from logging import getLogger
- import warnings
- import sys
- from pkg_resources import iter_entry_points
- from tzlocal import get_localzone
- import six
- from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
- from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
- from apscheduler.executors.pool import ThreadPoolExecutor
- from apscheduler.jobstores.base import ConflictingIdError, JobLookupError, BaseJobStore
- from apscheduler.jobstores.memory import MemoryJobStore
- from apscheduler.job import Job
- from apscheduler.triggers.base import BaseTrigger
- from apscheduler.util import asbool, asint, astimezone, maybe_ref, timedelta_seconds, undefined
- from apscheduler.events import (
- SchedulerEvent, JobEvent, JobSubmissionEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
- EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED,
- EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED,
- EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED)
- #: constant indicating a scheduler's stopped state
- STATE_STOPPED = 0
- #: constant indicating a scheduler's running state (started and processing jobs)
- STATE_RUNNING = 1
- #: constant indicating a scheduler's paused state (started but not processing jobs)
- STATE_PAUSED = 2
- class BaseScheduler(six.with_metaclass(ABCMeta)):
- """
- Abstract base class for all schedulers.
- Takes the following keyword arguments:
- :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
- apscheduler.scheduler)
- :param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
- :param dict job_defaults: default values for newly added jobs
- :param dict jobstores: a dictionary of job store alias -> job store instance or configuration
- dict
- :param dict executors: a dictionary of executor alias -> executor instance or configuration
- dict
- :ivar int state: current running state of the scheduler (one of the following constants from
- ``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``)
- .. seealso:: :ref:`scheduler-config`
- """
- _trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
- _trigger_classes = {}
- _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
- _executor_classes = {}
- _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
- _jobstore_classes = {}
- #
- # Public API
- #
- def __init__(self, gconfig={}, **options):
- super(BaseScheduler, self).__init__()
- self._executors = {}
- self._executors_lock = self._create_lock()
- self._jobstores = {}
- self._jobstores_lock = self._create_lock()
- self._listeners = []
- self._listeners_lock = self._create_lock()
- self._pending_jobs = []
- self.state = STATE_STOPPED
- self.configure(gconfig, **options)
- def configure(self, gconfig={}, prefix='apscheduler.', **options):
- """
- Reconfigures the scheduler with the given options.
- Can only be done when the scheduler isn't running.
- :param dict gconfig: a "global" configuration dictionary whose values can be overridden by
- keyword arguments to this method
- :param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with
- this string (pass an empty string or ``None`` to use all keys)
- :raises SchedulerAlreadyRunningError: if the scheduler is already running
- """
- if self.state != STATE_STOPPED:
- raise SchedulerAlreadyRunningError
- # If a non-empty prefix was given, strip it from the keys in the
- # global configuration dict
- if prefix:
- prefixlen = len(prefix)
- gconfig = dict((key[prefixlen:], value) for key, value in six.iteritems(gconfig)
- if key.startswith(prefix))
- # Create a structure from the dotted options
- # (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}})
- config = {}
- for key, value in six.iteritems(gconfig):
- parts = key.split('.')
- parent = config
- key = parts.pop(0)
- while parts:
- parent = parent.setdefault(key, {})
- key = parts.pop(0)
- parent[key] = value
- # Override any options with explicit keyword arguments
- config.update(options)
- self._configure(config)
- def start(self, paused=False):
- """
- Start the configured executors and job stores and begin processing scheduled jobs.
- :param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
- :raises SchedulerAlreadyRunningError: if the scheduler is already running
- """
- if self.state != STATE_STOPPED:
- raise SchedulerAlreadyRunningError
- with self._executors_lock:
- # Create a default executor if nothing else is configured
- if 'default' not in self._executors:
- self.add_executor(self._create_default_executor(), 'default')
- # Start all the executors
- for alias, executor in six.iteritems(self._executors):
- executor.start(self, alias)
- with self._jobstores_lock:
- # Create a default job store if nothing else is configured
- if 'default' not in self._jobstores:
- self.add_jobstore(self._create_default_jobstore(), 'default')
- # Start all the job stores
- for alias, store in six.iteritems(self._jobstores):
- store.start(self, alias)
- # Schedule all pending jobs
- for job, jobstore_alias, replace_existing in self._pending_jobs:
- self._real_add_job(job, jobstore_alias, replace_existing)
- del self._pending_jobs[:]
- self.state = STATE_PAUSED if paused else STATE_RUNNING
- self._logger.info('Scheduler started')
- self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START))
- if not paused:
- self.wakeup()
- @abstractmethod
- def shutdown(self, wait=True):
- """
- Shuts down the scheduler, along with its executors and job stores.
- Does not interrupt any currently running jobs.
- :param bool wait: ``True`` to wait until all currently executing jobs have finished
- :raises SchedulerNotRunningError: if the scheduler has not been started yet
- """
- if self.state == STATE_STOPPED:
- raise SchedulerNotRunningError
- self.state = STATE_STOPPED
- with self._jobstores_lock, self._executors_lock:
- # Shut down all executors
- for executor in six.itervalues(self._executors):
- executor.shutdown(wait)
- # Shut down all job stores
- for jobstore in six.itervalues(self._jobstores):
- jobstore.shutdown()
- self._logger.info('Scheduler has been shut down')
- self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
- def pause(self):
- """
- Pause job processing in the scheduler.
- This will prevent the scheduler from waking up to do job processing until :meth:`resume`
- is called. It will not however stop any already running job processing.
- """
- if self.state == STATE_STOPPED:
- raise SchedulerNotRunningError
- elif self.state == STATE_RUNNING:
- self.state = STATE_PAUSED
- self._logger.info('Paused scheduler job processing')
- self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_PAUSED))
- def resume(self):
- """Resume job processing in the scheduler."""
- if self.state == STATE_STOPPED:
- raise SchedulerNotRunningError
- elif self.state == STATE_PAUSED:
- self.state = STATE_RUNNING
- self._logger.info('Resumed scheduler job processing')
- self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_RESUMED))
- self.wakeup()
- @property
- def running(self):
- """
- Return ``True`` if the scheduler has been started.
- This is a shortcut for ``scheduler.state != STATE_STOPPED``.
- """
- return self.state != STATE_STOPPED
- def add_executor(self, executor, alias='default', **executor_opts):
- """
- Adds an executor to this scheduler.
- Any extra keyword arguments will be passed to the executor plugin's constructor, assuming
- that the first argument is the name of an executor plugin.
- :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor
- instance or the name of an executor plugin
- :param str|unicode alias: alias for the scheduler
- :raises ValueError: if there is already an executor by the given alias
- """
- with self._executors_lock:
- if alias in self._executors:
- raise ValueError('This scheduler already has an executor by the alias of "%s"' %
- alias)
- if isinstance(executor, BaseExecutor):
- self._executors[alias] = executor
- elif isinstance(executor, six.string_types):
- self._executors[alias] = executor = self._create_plugin_instance(
- 'executor', executor, executor_opts)
- else:
- raise TypeError('Expected an executor instance or a string, got %s instead' %
- executor.__class__.__name__)
- # Start the executor right away if the scheduler is running
- if self.state != STATE_STOPPED:
- executor.start(self, alias)
- self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_ADDED, alias))
- def remove_executor(self, alias, shutdown=True):
- """
- Removes the executor by the given alias from this scheduler.
- :param str|unicode alias: alias of the executor
- :param bool shutdown: ``True`` to shut down the executor after
- removing it
- """
- with self._executors_lock:
- executor = self._lookup_executor(alias)
- del self._executors[alias]
- if shutdown:
- executor.shutdown()
- self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_REMOVED, alias))
- def add_jobstore(self, jobstore, alias='default', **jobstore_opts):
- """
- Adds a job store to this scheduler.
- Any extra keyword arguments will be passed to the job store plugin's constructor, assuming
- that the first argument is the name of a job store plugin.
- :param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
- :param str|unicode alias: alias for the job store
- :raises ValueError: if there is already a job store by the given alias
- """
- with self._jobstores_lock:
- if alias in self._jobstores:
- raise ValueError('This scheduler already has a job store by the alias of "%s"' %
- alias)
- if isinstance(jobstore, BaseJobStore):
- self._jobstores[alias] = jobstore
- elif isinstance(jobstore, six.string_types):
- self._jobstores[alias] = jobstore = self._create_plugin_instance(
- 'jobstore', jobstore, jobstore_opts)
- else:
- raise TypeError('Expected a job store instance or a string, got %s instead' %
- jobstore.__class__.__name__)
- # Start the job store right away if the scheduler isn't stopped
- if self.state != STATE_STOPPED:
- jobstore.start(self, alias)
- # Notify listeners that a new job store has been added
- self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_ADDED, alias))
- # Notify the scheduler so it can scan the new job store for jobs
- if self.state != STATE_STOPPED:
- self.wakeup()
- def remove_jobstore(self, alias, shutdown=True):
- """
- Removes the job store by the given alias from this scheduler.
- :param str|unicode alias: alias of the job store
- :param bool shutdown: ``True`` to shut down the job store after removing it
- """
- with self._jobstores_lock:
- jobstore = self._lookup_jobstore(alias)
- del self._jobstores[alias]
- if shutdown:
- jobstore.shutdown()
- self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_REMOVED, alias))
- def add_listener(self, callback, mask=EVENT_ALL):
- """
- add_listener(callback, mask=EVENT_ALL)
- Adds a listener for scheduler events.
- When a matching event occurs, ``callback`` is executed with the event object as its
- sole argument. If the ``mask`` parameter is not provided, the callback will receive events
- of all types.
- :param callback: any callable that takes one argument
- :param int mask: bitmask that indicates which events should be
- listened to
- .. seealso:: :mod:`apscheduler.events`
- .. seealso:: :ref:`scheduler-events`
- """
- with self._listeners_lock:
- self._listeners.append((callback, mask))
- def remove_listener(self, callback):
- """Removes a previously added event listener."""
- with self._listeners_lock:
- for i, (cb, _) in enumerate(self._listeners):
- if callback == cb:
- del self._listeners[i]
- def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
- misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
- next_run_time=undefined, jobstore='default', executor='default',
- replace_existing=False, **trigger_args):
- """
- add_job(func, trigger=None, args=None, kwargs=None, id=None, \
- name=None, misfire_grace_time=undefined, coalesce=undefined, \
- max_instances=undefined, next_run_time=undefined, \
- jobstore='default', executor='default', \
- replace_existing=False, **trigger_args)
- Adds the given job to the job list and wakes up the scheduler if it's already running.
- Any option that defaults to ``undefined`` will be replaced with the corresponding default
- value when the job is scheduled (which happens when the scheduler is started, or
- immediately if the scheduler is already running).
- The ``func`` argument can be given either as a callable object or a textual reference in
- the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
- importable module and the second half is a reference to the callable object, relative to
- the module.
- The ``trigger`` argument can either be:
- #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
- any extra keyword arguments to this method are passed on to the trigger's constructor
- #. an instance of a trigger class
- :param func: callable (or a textual reference to one) to run at the given time
- :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
- ``func`` is called
- :param list|tuple args: list of positional arguments to call func with
- :param dict kwargs: dict of keyword arguments to call func with
- :param str|unicode id: explicit identifier for the job (for modifying it later)
- :param str|unicode name: textual description of the job
- :param int misfire_grace_time: seconds after the designated runtime that the job is still
- allowed to be run
- :param bool coalesce: run once instead of many times if the scheduler determines that the
- job should be run more than once in succession
- :param int max_instances: maximum number of concurrently running instances allowed for this
- job
- :param datetime next_run_time: when to first run the job, regardless of the trigger (pass
- ``None`` to add the job as paused)
- :param str|unicode jobstore: alias of the job store to store the job in
- :param str|unicode executor: alias of the executor to run the job with
- :param bool replace_existing: ``True`` to replace an existing job with the same ``id``
- (but retain the number of runs from the existing one)
- :rtype: Job
- """
- job_kwargs = {
- 'trigger': self._create_trigger(trigger, trigger_args),
- 'executor': executor,
- 'func': func,
- 'args': tuple(args) if args is not None else (),
- 'kwargs': dict(kwargs) if kwargs is not None else {},
- 'id': id,
- 'name': name,
- 'misfire_grace_time': misfire_grace_time,
- 'coalesce': coalesce,
- 'max_instances': max_instances,
- 'next_run_time': next_run_time
- }
- job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
- value is not undefined)
- job = Job(self, **job_kwargs)
- # Don't really add jobs to job stores before the scheduler is up and running
- with self._jobstores_lock:
- if self.state == STATE_STOPPED:
- self._pending_jobs.append((job, jobstore, replace_existing))
- self._logger.info('Adding job tentatively -- it will be properly scheduled when '
- 'the scheduler starts')
- else:
- self._real_add_job(job, jobstore, replace_existing)
- return job
- def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None,
- misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
- next_run_time=undefined, jobstore='default', executor='default',
- **trigger_args):
- """
- scheduled_job(trigger, args=None, kwargs=None, id=None, \
- name=None, misfire_grace_time=undefined, \
- coalesce=undefined, max_instances=undefined, \
- next_run_time=undefined, jobstore='default', \
- executor='default',**trigger_args)
- A decorator version of :meth:`add_job`, except that ``replace_existing`` is always
- ``True``.
- .. important:: The ``id`` argument must be given if scheduling a job in a persistent job
- store. The scheduler cannot, however, enforce this requirement.
- """
- def inner(func):
- self.add_job(func, trigger, args, kwargs, id, name, misfire_grace_time, coalesce,
- max_instances, next_run_time, jobstore, executor, True, **trigger_args)
- return func
- return inner
- def modify_job(self, job_id, jobstore=None, **changes):
- """
- Modifies the properties of a single job.
- Modifications are passed to this method as extra keyword arguments.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that contains the job
- :return Job: the relevant job instance
- """
- with self._jobstores_lock:
- job, jobstore = self._lookup_job(job_id, jobstore)
- job._modify(**changes)
- if jobstore:
- self._lookup_jobstore(jobstore).update_job(job)
- self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))
- # Wake up the scheduler since the job's next run time may have been changed
- if self.state == STATE_RUNNING:
- self.wakeup()
- return job
- def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
- """
- Constructs a new trigger for a job and updates its next run time.
- Extra keyword arguments are passed directly to the trigger's constructor.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that contains the job
- :param trigger: alias of the trigger type or a trigger instance
- :return Job: the relevant job instance
- """
- trigger = self._create_trigger(trigger, trigger_args)
- now = datetime.now(self.timezone)
- next_run_time = trigger.get_next_fire_time(None, now)
- return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)
- def pause_job(self, job_id, jobstore=None):
- """
- Causes the given job not to be executed until it is explicitly resumed.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that contains the job
- :return Job: the relevant job instance
- """
- return self.modify_job(job_id, jobstore, next_run_time=None)
- def resume_job(self, job_id, jobstore=None):
- """
- Resumes the schedule of the given job, or removes the job if its schedule is finished.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that contains the job
- :return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no
- next run time could be calculated and the job was removed
- """
- with self._jobstores_lock:
- job, jobstore = self._lookup_job(job_id, jobstore)
- now = datetime.now(self.timezone)
- next_run_time = job.trigger.get_next_fire_time(None, now)
- if next_run_time:
- return self.modify_job(job_id, jobstore, next_run_time=next_run_time)
- else:
- self.remove_job(job.id, jobstore)
- def get_jobs(self, jobstore=None, pending=None):
- """
- Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled
- jobs, either from a specific job store or from all of them.
- If the scheduler has not been started yet, only pending jobs can be returned because the
- job stores haven't been started yet either.
- :param str|unicode jobstore: alias of the job store
- :param bool pending: **DEPRECATED**
- :rtype: list[Job]
- """
- if pending is not None:
- warnings.warn('The "pending" option is deprecated -- get_jobs() always returns '
- 'pending jobs if the scheduler has been started and scheduled jobs '
- 'otherwise', DeprecationWarning)
- with self._jobstores_lock:
- jobs = []
- if self.state == STATE_STOPPED:
- for job, alias, replace_existing in self._pending_jobs:
- if jobstore is None or alias == jobstore:
- jobs.append(job)
- else:
- for alias, store in six.iteritems(self._jobstores):
- if jobstore is None or alias == jobstore:
- jobs.extend(store.get_all_jobs())
- return jobs
- def get_job(self, job_id, jobstore=None):
- """
- Returns the Job that matches the given ``job_id``.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that most likely contains the job
- :return: the Job by the given ID, or ``None`` if it wasn't found
- :rtype: Job
- """
- with self._jobstores_lock:
- try:
- return self._lookup_job(job_id, jobstore)[0]
- except JobLookupError:
- return
- def remove_job(self, job_id, jobstore=None):
- """
- Removes a job, preventing it from being run any more.
- :param str|unicode job_id: the identifier of the job
- :param str|unicode jobstore: alias of the job store that contains the job
- :raises JobLookupError: if the job was not found
- """
- jobstore_alias = None
- with self._jobstores_lock:
- if self.state == STATE_STOPPED:
- # Check if the job is among the pending jobs
- if self.state == STATE_STOPPED:
- for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
- if job.id == job_id and jobstore in (None, alias):
- del self._pending_jobs[i]
- jobstore_alias = alias
- break
- else:
- # Otherwise, try to remove it from each store until it succeeds or we run out of
- # stores to check
- for alias, store in six.iteritems(self._jobstores):
- if jobstore in (None, alias):
- try:
- store.remove_job(job_id)
- jobstore_alias = alias
- break
- except JobLookupError:
- continue
- if jobstore_alias is None:
- raise JobLookupError(job_id)
- # Notify listeners that a job has been removed
- event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore_alias)
- self._dispatch_event(event)
- self._logger.info('Removed job %s', job_id)
- def remove_all_jobs(self, jobstore=None):
- """
- Removes all jobs from the specified job store, or all job stores if none is given.
- :param str|unicode jobstore: alias of the job store
- """
- with self._jobstores_lock:
- if self.state == STATE_STOPPED:
- if jobstore:
- self._pending_jobs = [pending for pending in self._pending_jobs if
- pending[1] != jobstore]
- else:
- self._pending_jobs = []
- else:
- for alias, store in six.iteritems(self._jobstores):
- if jobstore in (None, alias):
- store.remove_all_jobs()
- self._dispatch_event(SchedulerEvent(EVENT_ALL_JOBS_REMOVED, jobstore))
- def print_jobs(self, jobstore=None, out=None):
- """
- print_jobs(jobstore=None, out=sys.stdout)
- Prints out a textual listing of all jobs currently scheduled on either all job stores or
- just a specific one.
- :param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores
- :param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is
- given)
- """
- out = out or sys.stdout
- with self._jobstores_lock:
- if self.state == STATE_STOPPED:
- print(u'Pending jobs:', file=out)
- if self._pending_jobs:
- for job, jobstore_alias, replace_existing in self._pending_jobs:
- if jobstore in (None, jobstore_alias):
- print(u' %s' % job, file=out)
- else:
- print(u' No pending jobs', file=out)
- else:
- for alias, store in sorted(six.iteritems(self._jobstores)):
- if jobstore in (None, alias):
- print(u'Jobstore %s:' % alias, file=out)
- jobs = store.get_all_jobs()
- if jobs:
- for job in jobs:
- print(u' %s' % job, file=out)
- else:
- print(u' No scheduled jobs', file=out)
- @abstractmethod
- def wakeup(self):
- """
- Notifies the scheduler that there may be jobs due for execution.
- Triggers :meth:`_process_jobs` to be run in an implementation specific manner.
- """
- #
- # Private API
- #
- def _configure(self, config):
- # Set general options
- self._logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler.scheduler')
- self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
- # Set the job defaults
- job_defaults = config.get('job_defaults', {})
- self._job_defaults = {
- 'misfire_grace_time': asint(job_defaults.get('misfire_grace_time', 1)),
- 'coalesce': asbool(job_defaults.get('coalesce', True)),
- 'max_instances': asint(job_defaults.get('max_instances', 1))
- }
- # Configure executors
- self._executors.clear()
- for alias, value in six.iteritems(config.get('executors', {})):
- if isinstance(value, BaseExecutor):
- self.add_executor(value, alias)
- elif isinstance(value, MutableMapping):
- executor_class = value.pop('class', None)
- plugin = value.pop('type', None)
- if plugin:
- executor = self._create_plugin_instance('executor', plugin, value)
- elif executor_class:
- cls = maybe_ref(executor_class)
- executor = cls(**value)
- else:
- raise ValueError(
- 'Cannot create executor "%s" -- either "type" or "class" must be defined' %
- alias)
- self.add_executor(executor, alias)
- else:
- raise TypeError(
- "Expected executor instance or dict for executors['%s'], got %s instead" %
- (alias, value.__class__.__name__))
- # Configure job stores
- self._jobstores.clear()
- for alias, value in six.iteritems(config.get('jobstores', {})):
- if isinstance(value, BaseJobStore):
- self.add_jobstore(value, alias)
- elif isinstance(value, MutableMapping):
- jobstore_class = value.pop('class', None)
- plugin = value.pop('type', None)
- if plugin:
- jobstore = self._create_plugin_instance('jobstore', plugin, value)
- elif jobstore_class:
- cls = maybe_ref(jobstore_class)
- jobstore = cls(**value)
- else:
- raise ValueError(
- 'Cannot create job store "%s" -- either "type" or "class" must be '
- 'defined' % alias)
- self.add_jobstore(jobstore, alias)
- else:
- raise TypeError(
- "Expected job store instance or dict for jobstores['%s'], got %s instead" %
- (alias, value.__class__.__name__))
- def _create_default_executor(self):
- """Creates a default executor store, specific to the particular scheduler type."""
- return ThreadPoolExecutor()
- def _create_default_jobstore(self):
- """Creates a default job store, specific to the particular scheduler type."""
- return MemoryJobStore()
- def _lookup_executor(self, alias):
- """
- Returns the executor instance by the given name from the list of executors that were added
- to this scheduler.
- :type alias: str
- :raises KeyError: if no executor by the given alias is not found
- """
- try:
- return self._executors[alias]
- except KeyError:
- raise KeyError('No such executor: %s' % alias)
- def _lookup_jobstore(self, alias):
- """
- Returns the job store instance by the given name from the list of job stores that were
- added to this scheduler.
- :type alias: str
- :raises KeyError: if no job store by the given alias is not found
- """
- try:
- return self._jobstores[alias]
- except KeyError:
- raise KeyError('No such job store: %s' % alias)
- def _lookup_job(self, job_id, jobstore_alias):
- """
- Finds a job by its ID.
- :type job_id: str
- :param str jobstore_alias: alias of a job store to look in
- :return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of
- a pending job)
- :raises JobLookupError: if no job by the given ID is found.
- """
- if self.state == STATE_STOPPED:
- # Check if the job is among the pending jobs
- for job, alias, replace_existing in self._pending_jobs:
- if job.id == job_id:
- return job, None
- else:
- # Look in all job stores
- for alias, store in six.iteritems(self._jobstores):
- if jobstore_alias in (None, alias):
- job = store.lookup_job(job_id)
- if job is not None:
- return job, alias
- raise JobLookupError(job_id)
- def _dispatch_event(self, event):
- """
- Dispatches the given event to interested listeners.
- :param SchedulerEvent event: the event to send
- """
- with self._listeners_lock:
- listeners = tuple(self._listeners)
- for cb, mask in listeners:
- if event.code & mask:
- try:
- cb(event)
- except:
- self._logger.exception('Error notifying listener')
- def _real_add_job(self, job, jobstore_alias, replace_existing):
- """
- :param Job job: the job to add
- :param bool replace_existing: ``True`` to use update_job() in case the job already exists
- in the store
- """
- # Fill in undefined values with defaults
- replacements = {}
- for key, value in six.iteritems(self._job_defaults):
- if not hasattr(job, key):
- replacements[key] = value
- # Calculate the next run time if there is none defined
- if not hasattr(job, 'next_run_time'):
- now = datetime.now(self.timezone)
- replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)
- # Apply any replacements
- job._modify(**replacements)
- # Add the job to the given job store
- store = self._lookup_jobstore(jobstore_alias)
- try:
- store.add_job(job)
- except ConflictingIdError:
- if replace_existing:
- store.update_job(job)
- else:
- raise
- # Mark the job as no longer pending
- job._jobstore_alias = jobstore_alias
- # Notify listeners that a new job has been added
- event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
- self._dispatch_event(event)
- self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)
- # Notify the scheduler about the new job
- if self.state == STATE_RUNNING:
- self.wakeup()
- def _create_plugin_instance(self, type_, alias, constructor_kwargs):
- """Creates an instance of the given plugin type, loading the plugin first if necessary."""
- plugin_container, class_container, base_class = {
- 'trigger': (self._trigger_plugins, self._trigger_classes, BaseTrigger),
- 'jobstore': (self._jobstore_plugins, self._jobstore_classes, BaseJobStore),
- 'executor': (self._executor_plugins, self._executor_classes, BaseExecutor)
- }[type_]
- try:
- plugin_cls = class_container[alias]
- except KeyError:
- if alias in plugin_container:
- plugin_cls = class_container[alias] = plugin_container[alias].load()
- if not issubclass(plugin_cls, base_class):
- raise TypeError('The {0} entry point does not point to a {0} class'.
- format(type_))
- else:
- raise LookupError('No {0} by the name "{1}" was found'.format(type_, alias))
- return plugin_cls(**constructor_kwargs)
- def _create_trigger(self, trigger, trigger_args):
- if isinstance(trigger, BaseTrigger):
- return trigger
- elif trigger is None:
- trigger = 'date'
- elif not isinstance(trigger, six.string_types):
- raise TypeError('Expected a trigger instance or string, got %s instead' %
- trigger.__class__.__name__)
- # Use the scheduler's time zone if nothing else is specified
- trigger_args.setdefault('timezone', self.timezone)
- # Instantiate the trigger class
- return self._create_plugin_instance('trigger', trigger, trigger_args)
- def _create_lock(self):
- """Creates a reentrant lock object."""
- return RLock()
- def _process_jobs(self):
- """
- Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
- to wait for the next round.
- """
- if self.state == STATE_PAUSED:
- self._logger.debug('Scheduler is paused -- not processing jobs')
- return None
- self._logger.debug('Looking for jobs to run')
- now = datetime.now(self.timezone)
- next_wakeup_time = None
- events = []
- with self._jobstores_lock:
- for jobstore_alias, jobstore in six.iteritems(self._jobstores):
- for job in jobstore.get_due_jobs(now):
- # Look up the job's executor
- try:
- executor = self._lookup_executor(job.executor)
- except:
- self._logger.error(
- 'Executor lookup ("%s") failed for job "%s" -- removing it from the '
- 'job store', job.executor, job)
- self.remove_job(job.id, jobstore_alias)
- continue
- run_times = job._get_run_times(now)
- run_times = run_times[-1:] if run_times and job.coalesce else run_times
- if run_times:
- try:
- executor.submit_job(job, run_times)
- except MaxInstancesReachedError:
- self._logger.warning(
- 'Execution of job "%s" skipped: maximum number of running '
- 'instances reached (%d)', job, job.max_instances)
- event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
- jobstore_alias, run_times)
- events.append(event)
- except:
- self._logger.exception('Error submitting job "%s" to executor "%s"',
- job, job.executor)
- else:
- event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
- run_times)
- events.append(event)
- # Update the job if it has a next execution time.
- # Otherwise remove it from the job store.
- job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
- if job_next_run:
- job._modify(next_run_time=job_next_run)
- jobstore.update_job(job)
- else:
- self.remove_job(job.id, jobstore_alias)
- # Set a new next wakeup time if there isn't one yet or
- # the jobstore has an even earlier one
- jobstore_next_run_time = jobstore.get_next_run_time()
- if jobstore_next_run_time and (next_wakeup_time is None or
- jobstore_next_run_time < next_wakeup_time):
- next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)
- # Dispatch collected events
- for event in events:
- self._dispatch_event(event)
- # Determine the delay until this method should be called again
- if self.state == STATE_PAUSED:
- wait_seconds = None
- self._logger.debug('Scheduler is paused; waiting until resume() is called')
- elif next_wakeup_time is None:
- wait_seconds = None
- self._logger.debug('No jobs; waiting until a job is added')
- else:
- wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0)
- self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
- wait_seconds)
- return wait_seconds
|