base.py 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987
  1. from __future__ import print_function
  2. from abc import ABCMeta, abstractmethod
  3. from collections import MutableMapping
  4. from threading import RLock
  5. from datetime import datetime
  6. from logging import getLogger
  7. import warnings
  8. import sys
  9. from pkg_resources import iter_entry_points
  10. from tzlocal import get_localzone
  11. import six
  12. from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
  13. from apscheduler.executors.base import MaxInstancesReachedError, BaseExecutor
  14. from apscheduler.executors.pool import ThreadPoolExecutor
  15. from apscheduler.jobstores.base import ConflictingIdError, JobLookupError, BaseJobStore
  16. from apscheduler.jobstores.memory import MemoryJobStore
  17. from apscheduler.job import Job
  18. from apscheduler.triggers.base import BaseTrigger
  19. from apscheduler.util import asbool, asint, astimezone, maybe_ref, timedelta_seconds, undefined
  20. from apscheduler.events import (
  21. SchedulerEvent, JobEvent, JobSubmissionEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
  22. EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED,
  23. EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED,
  24. EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED)
  25. #: constant indicating a scheduler's stopped state
  26. STATE_STOPPED = 0
  27. #: constant indicating a scheduler's running state (started and processing jobs)
  28. STATE_RUNNING = 1
  29. #: constant indicating a scheduler's paused state (started but not processing jobs)
  30. STATE_PAUSED = 2
  31. class BaseScheduler(six.with_metaclass(ABCMeta)):
  32. """
  33. Abstract base class for all schedulers.
  34. Takes the following keyword arguments:
  35. :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to
  36. apscheduler.scheduler)
  37. :param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone)
  38. :param dict job_defaults: default values for newly added jobs
  39. :param dict jobstores: a dictionary of job store alias -> job store instance or configuration
  40. dict
  41. :param dict executors: a dictionary of executor alias -> executor instance or configuration
  42. dict
  43. :ivar int state: current running state of the scheduler (one of the following constants from
  44. ``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``)
  45. .. seealso:: :ref:`scheduler-config`
  46. """
  47. _trigger_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.triggers'))
  48. _trigger_classes = {}
  49. _executor_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.executors'))
  50. _executor_classes = {}
  51. _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores'))
  52. _jobstore_classes = {}
  53. #
  54. # Public API
  55. #
  56. def __init__(self, gconfig={}, **options):
  57. super(BaseScheduler, self).__init__()
  58. self._executors = {}
  59. self._executors_lock = self._create_lock()
  60. self._jobstores = {}
  61. self._jobstores_lock = self._create_lock()
  62. self._listeners = []
  63. self._listeners_lock = self._create_lock()
  64. self._pending_jobs = []
  65. self.state = STATE_STOPPED
  66. self.configure(gconfig, **options)
  67. def configure(self, gconfig={}, prefix='apscheduler.', **options):
  68. """
  69. Reconfigures the scheduler with the given options.
  70. Can only be done when the scheduler isn't running.
  71. :param dict gconfig: a "global" configuration dictionary whose values can be overridden by
  72. keyword arguments to this method
  73. :param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with
  74. this string (pass an empty string or ``None`` to use all keys)
  75. :raises SchedulerAlreadyRunningError: if the scheduler is already running
  76. """
  77. if self.state != STATE_STOPPED:
  78. raise SchedulerAlreadyRunningError
  79. # If a non-empty prefix was given, strip it from the keys in the
  80. # global configuration dict
  81. if prefix:
  82. prefixlen = len(prefix)
  83. gconfig = dict((key[prefixlen:], value) for key, value in six.iteritems(gconfig)
  84. if key.startswith(prefix))
  85. # Create a structure from the dotted options
  86. # (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}})
  87. config = {}
  88. for key, value in six.iteritems(gconfig):
  89. parts = key.split('.')
  90. parent = config
  91. key = parts.pop(0)
  92. while parts:
  93. parent = parent.setdefault(key, {})
  94. key = parts.pop(0)
  95. parent[key] = value
  96. # Override any options with explicit keyword arguments
  97. config.update(options)
  98. self._configure(config)
  99. def start(self, paused=False):
  100. """
  101. Start the configured executors and job stores and begin processing scheduled jobs.
  102. :param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
  103. :raises SchedulerAlreadyRunningError: if the scheduler is already running
  104. """
  105. if self.state != STATE_STOPPED:
  106. raise SchedulerAlreadyRunningError
  107. with self._executors_lock:
  108. # Create a default executor if nothing else is configured
  109. if 'default' not in self._executors:
  110. self.add_executor(self._create_default_executor(), 'default')
  111. # Start all the executors
  112. for alias, executor in six.iteritems(self._executors):
  113. executor.start(self, alias)
  114. with self._jobstores_lock:
  115. # Create a default job store if nothing else is configured
  116. if 'default' not in self._jobstores:
  117. self.add_jobstore(self._create_default_jobstore(), 'default')
  118. # Start all the job stores
  119. for alias, store in six.iteritems(self._jobstores):
  120. store.start(self, alias)
  121. # Schedule all pending jobs
  122. for job, jobstore_alias, replace_existing in self._pending_jobs:
  123. self._real_add_job(job, jobstore_alias, replace_existing)
  124. del self._pending_jobs[:]
  125. self.state = STATE_PAUSED if paused else STATE_RUNNING
  126. self._logger.info('Scheduler started')
  127. self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START))
  128. if not paused:
  129. self.wakeup()
  130. @abstractmethod
  131. def shutdown(self, wait=True):
  132. """
  133. Shuts down the scheduler, along with its executors and job stores.
  134. Does not interrupt any currently running jobs.
  135. :param bool wait: ``True`` to wait until all currently executing jobs have finished
  136. :raises SchedulerNotRunningError: if the scheduler has not been started yet
  137. """
  138. if self.state == STATE_STOPPED:
  139. raise SchedulerNotRunningError
  140. self.state = STATE_STOPPED
  141. with self._jobstores_lock, self._executors_lock:
  142. # Shut down all executors
  143. for executor in six.itervalues(self._executors):
  144. executor.shutdown(wait)
  145. # Shut down all job stores
  146. for jobstore in six.itervalues(self._jobstores):
  147. jobstore.shutdown()
  148. self._logger.info('Scheduler has been shut down')
  149. self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN))
  150. def pause(self):
  151. """
  152. Pause job processing in the scheduler.
  153. This will prevent the scheduler from waking up to do job processing until :meth:`resume`
  154. is called. It will not however stop any already running job processing.
  155. """
  156. if self.state == STATE_STOPPED:
  157. raise SchedulerNotRunningError
  158. elif self.state == STATE_RUNNING:
  159. self.state = STATE_PAUSED
  160. self._logger.info('Paused scheduler job processing')
  161. self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_PAUSED))
  162. def resume(self):
  163. """Resume job processing in the scheduler."""
  164. if self.state == STATE_STOPPED:
  165. raise SchedulerNotRunningError
  166. elif self.state == STATE_PAUSED:
  167. self.state = STATE_RUNNING
  168. self._logger.info('Resumed scheduler job processing')
  169. self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_RESUMED))
  170. self.wakeup()
  171. @property
  172. def running(self):
  173. """
  174. Return ``True`` if the scheduler has been started.
  175. This is a shortcut for ``scheduler.state != STATE_STOPPED``.
  176. """
  177. return self.state != STATE_STOPPED
  178. def add_executor(self, executor, alias='default', **executor_opts):
  179. """
  180. Adds an executor to this scheduler.
  181. Any extra keyword arguments will be passed to the executor plugin's constructor, assuming
  182. that the first argument is the name of an executor plugin.
  183. :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor
  184. instance or the name of an executor plugin
  185. :param str|unicode alias: alias for the scheduler
  186. :raises ValueError: if there is already an executor by the given alias
  187. """
  188. with self._executors_lock:
  189. if alias in self._executors:
  190. raise ValueError('This scheduler already has an executor by the alias of "%s"' %
  191. alias)
  192. if isinstance(executor, BaseExecutor):
  193. self._executors[alias] = executor
  194. elif isinstance(executor, six.string_types):
  195. self._executors[alias] = executor = self._create_plugin_instance(
  196. 'executor', executor, executor_opts)
  197. else:
  198. raise TypeError('Expected an executor instance or a string, got %s instead' %
  199. executor.__class__.__name__)
  200. # Start the executor right away if the scheduler is running
  201. if self.state != STATE_STOPPED:
  202. executor.start(self, alias)
  203. self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_ADDED, alias))
  204. def remove_executor(self, alias, shutdown=True):
  205. """
  206. Removes the executor by the given alias from this scheduler.
  207. :param str|unicode alias: alias of the executor
  208. :param bool shutdown: ``True`` to shut down the executor after
  209. removing it
  210. """
  211. with self._executors_lock:
  212. executor = self._lookup_executor(alias)
  213. del self._executors[alias]
  214. if shutdown:
  215. executor.shutdown()
  216. self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_REMOVED, alias))
  217. def add_jobstore(self, jobstore, alias='default', **jobstore_opts):
  218. """
  219. Adds a job store to this scheduler.
  220. Any extra keyword arguments will be passed to the job store plugin's constructor, assuming
  221. that the first argument is the name of a job store plugin.
  222. :param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added
  223. :param str|unicode alias: alias for the job store
  224. :raises ValueError: if there is already a job store by the given alias
  225. """
  226. with self._jobstores_lock:
  227. if alias in self._jobstores:
  228. raise ValueError('This scheduler already has a job store by the alias of "%s"' %
  229. alias)
  230. if isinstance(jobstore, BaseJobStore):
  231. self._jobstores[alias] = jobstore
  232. elif isinstance(jobstore, six.string_types):
  233. self._jobstores[alias] = jobstore = self._create_plugin_instance(
  234. 'jobstore', jobstore, jobstore_opts)
  235. else:
  236. raise TypeError('Expected a job store instance or a string, got %s instead' %
  237. jobstore.__class__.__name__)
  238. # Start the job store right away if the scheduler isn't stopped
  239. if self.state != STATE_STOPPED:
  240. jobstore.start(self, alias)
  241. # Notify listeners that a new job store has been added
  242. self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_ADDED, alias))
  243. # Notify the scheduler so it can scan the new job store for jobs
  244. if self.state != STATE_STOPPED:
  245. self.wakeup()
  246. def remove_jobstore(self, alias, shutdown=True):
  247. """
  248. Removes the job store by the given alias from this scheduler.
  249. :param str|unicode alias: alias of the job store
  250. :param bool shutdown: ``True`` to shut down the job store after removing it
  251. """
  252. with self._jobstores_lock:
  253. jobstore = self._lookup_jobstore(alias)
  254. del self._jobstores[alias]
  255. if shutdown:
  256. jobstore.shutdown()
  257. self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_REMOVED, alias))
  258. def add_listener(self, callback, mask=EVENT_ALL):
  259. """
  260. add_listener(callback, mask=EVENT_ALL)
  261. Adds a listener for scheduler events.
  262. When a matching event occurs, ``callback`` is executed with the event object as its
  263. sole argument. If the ``mask`` parameter is not provided, the callback will receive events
  264. of all types.
  265. :param callback: any callable that takes one argument
  266. :param int mask: bitmask that indicates which events should be
  267. listened to
  268. .. seealso:: :mod:`apscheduler.events`
  269. .. seealso:: :ref:`scheduler-events`
  270. """
  271. with self._listeners_lock:
  272. self._listeners.append((callback, mask))
  273. def remove_listener(self, callback):
  274. """Removes a previously added event listener."""
  275. with self._listeners_lock:
  276. for i, (cb, _) in enumerate(self._listeners):
  277. if callback == cb:
  278. del self._listeners[i]
  279. def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None,
  280. misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
  281. next_run_time=undefined, jobstore='default', executor='default',
  282. replace_existing=False, **trigger_args):
  283. """
  284. add_job(func, trigger=None, args=None, kwargs=None, id=None, \
  285. name=None, misfire_grace_time=undefined, coalesce=undefined, \
  286. max_instances=undefined, next_run_time=undefined, \
  287. jobstore='default', executor='default', \
  288. replace_existing=False, **trigger_args)
  289. Adds the given job to the job list and wakes up the scheduler if it's already running.
  290. Any option that defaults to ``undefined`` will be replaced with the corresponding default
  291. value when the job is scheduled (which happens when the scheduler is started, or
  292. immediately if the scheduler is already running).
  293. The ``func`` argument can be given either as a callable object or a textual reference in
  294. the ``package.module:some.object`` format, where the first half (separated by ``:``) is an
  295. importable module and the second half is a reference to the callable object, relative to
  296. the module.
  297. The ``trigger`` argument can either be:
  298. #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case
  299. any extra keyword arguments to this method are passed on to the trigger's constructor
  300. #. an instance of a trigger class
  301. :param func: callable (or a textual reference to one) to run at the given time
  302. :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when
  303. ``func`` is called
  304. :param list|tuple args: list of positional arguments to call func with
  305. :param dict kwargs: dict of keyword arguments to call func with
  306. :param str|unicode id: explicit identifier for the job (for modifying it later)
  307. :param str|unicode name: textual description of the job
  308. :param int misfire_grace_time: seconds after the designated runtime that the job is still
  309. allowed to be run
  310. :param bool coalesce: run once instead of many times if the scheduler determines that the
  311. job should be run more than once in succession
  312. :param int max_instances: maximum number of concurrently running instances allowed for this
  313. job
  314. :param datetime next_run_time: when to first run the job, regardless of the trigger (pass
  315. ``None`` to add the job as paused)
  316. :param str|unicode jobstore: alias of the job store to store the job in
  317. :param str|unicode executor: alias of the executor to run the job with
  318. :param bool replace_existing: ``True`` to replace an existing job with the same ``id``
  319. (but retain the number of runs from the existing one)
  320. :rtype: Job
  321. """
  322. job_kwargs = {
  323. 'trigger': self._create_trigger(trigger, trigger_args),
  324. 'executor': executor,
  325. 'func': func,
  326. 'args': tuple(args) if args is not None else (),
  327. 'kwargs': dict(kwargs) if kwargs is not None else {},
  328. 'id': id,
  329. 'name': name,
  330. 'misfire_grace_time': misfire_grace_time,
  331. 'coalesce': coalesce,
  332. 'max_instances': max_instances,
  333. 'next_run_time': next_run_time
  334. }
  335. job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if
  336. value is not undefined)
  337. job = Job(self, **job_kwargs)
  338. # Don't really add jobs to job stores before the scheduler is up and running
  339. with self._jobstores_lock:
  340. if self.state == STATE_STOPPED:
  341. self._pending_jobs.append((job, jobstore, replace_existing))
  342. self._logger.info('Adding job tentatively -- it will be properly scheduled when '
  343. 'the scheduler starts')
  344. else:
  345. self._real_add_job(job, jobstore, replace_existing)
  346. return job
  347. def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None,
  348. misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined,
  349. next_run_time=undefined, jobstore='default', executor='default',
  350. **trigger_args):
  351. """
  352. scheduled_job(trigger, args=None, kwargs=None, id=None, \
  353. name=None, misfire_grace_time=undefined, \
  354. coalesce=undefined, max_instances=undefined, \
  355. next_run_time=undefined, jobstore='default', \
  356. executor='default',**trigger_args)
  357. A decorator version of :meth:`add_job`, except that ``replace_existing`` is always
  358. ``True``.
  359. .. important:: The ``id`` argument must be given if scheduling a job in a persistent job
  360. store. The scheduler cannot, however, enforce this requirement.
  361. """
  362. def inner(func):
  363. self.add_job(func, trigger, args, kwargs, id, name, misfire_grace_time, coalesce,
  364. max_instances, next_run_time, jobstore, executor, True, **trigger_args)
  365. return func
  366. return inner
  367. def modify_job(self, job_id, jobstore=None, **changes):
  368. """
  369. Modifies the properties of a single job.
  370. Modifications are passed to this method as extra keyword arguments.
  371. :param str|unicode job_id: the identifier of the job
  372. :param str|unicode jobstore: alias of the job store that contains the job
  373. :return Job: the relevant job instance
  374. """
  375. with self._jobstores_lock:
  376. job, jobstore = self._lookup_job(job_id, jobstore)
  377. job._modify(**changes)
  378. if jobstore:
  379. self._lookup_jobstore(jobstore).update_job(job)
  380. self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore))
  381. # Wake up the scheduler since the job's next run time may have been changed
  382. if self.state == STATE_RUNNING:
  383. self.wakeup()
  384. return job
  385. def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args):
  386. """
  387. Constructs a new trigger for a job and updates its next run time.
  388. Extra keyword arguments are passed directly to the trigger's constructor.
  389. :param str|unicode job_id: the identifier of the job
  390. :param str|unicode jobstore: alias of the job store that contains the job
  391. :param trigger: alias of the trigger type or a trigger instance
  392. :return Job: the relevant job instance
  393. """
  394. trigger = self._create_trigger(trigger, trigger_args)
  395. now = datetime.now(self.timezone)
  396. next_run_time = trigger.get_next_fire_time(None, now)
  397. return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time)
  398. def pause_job(self, job_id, jobstore=None):
  399. """
  400. Causes the given job not to be executed until it is explicitly resumed.
  401. :param str|unicode job_id: the identifier of the job
  402. :param str|unicode jobstore: alias of the job store that contains the job
  403. :return Job: the relevant job instance
  404. """
  405. return self.modify_job(job_id, jobstore, next_run_time=None)
  406. def resume_job(self, job_id, jobstore=None):
  407. """
  408. Resumes the schedule of the given job, or removes the job if its schedule is finished.
  409. :param str|unicode job_id: the identifier of the job
  410. :param str|unicode jobstore: alias of the job store that contains the job
  411. :return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no
  412. next run time could be calculated and the job was removed
  413. """
  414. with self._jobstores_lock:
  415. job, jobstore = self._lookup_job(job_id, jobstore)
  416. now = datetime.now(self.timezone)
  417. next_run_time = job.trigger.get_next_fire_time(None, now)
  418. if next_run_time:
  419. return self.modify_job(job_id, jobstore, next_run_time=next_run_time)
  420. else:
  421. self.remove_job(job.id, jobstore)
  422. def get_jobs(self, jobstore=None, pending=None):
  423. """
  424. Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled
  425. jobs, either from a specific job store or from all of them.
  426. If the scheduler has not been started yet, only pending jobs can be returned because the
  427. job stores haven't been started yet either.
  428. :param str|unicode jobstore: alias of the job store
  429. :param bool pending: **DEPRECATED**
  430. :rtype: list[Job]
  431. """
  432. if pending is not None:
  433. warnings.warn('The "pending" option is deprecated -- get_jobs() always returns '
  434. 'pending jobs if the scheduler has been started and scheduled jobs '
  435. 'otherwise', DeprecationWarning)
  436. with self._jobstores_lock:
  437. jobs = []
  438. if self.state == STATE_STOPPED:
  439. for job, alias, replace_existing in self._pending_jobs:
  440. if jobstore is None or alias == jobstore:
  441. jobs.append(job)
  442. else:
  443. for alias, store in six.iteritems(self._jobstores):
  444. if jobstore is None or alias == jobstore:
  445. jobs.extend(store.get_all_jobs())
  446. return jobs
  447. def get_job(self, job_id, jobstore=None):
  448. """
  449. Returns the Job that matches the given ``job_id``.
  450. :param str|unicode job_id: the identifier of the job
  451. :param str|unicode jobstore: alias of the job store that most likely contains the job
  452. :return: the Job by the given ID, or ``None`` if it wasn't found
  453. :rtype: Job
  454. """
  455. with self._jobstores_lock:
  456. try:
  457. return self._lookup_job(job_id, jobstore)[0]
  458. except JobLookupError:
  459. return
  460. def remove_job(self, job_id, jobstore=None):
  461. """
  462. Removes a job, preventing it from being run any more.
  463. :param str|unicode job_id: the identifier of the job
  464. :param str|unicode jobstore: alias of the job store that contains the job
  465. :raises JobLookupError: if the job was not found
  466. """
  467. jobstore_alias = None
  468. with self._jobstores_lock:
  469. if self.state == STATE_STOPPED:
  470. # Check if the job is among the pending jobs
  471. if self.state == STATE_STOPPED:
  472. for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
  473. if job.id == job_id and jobstore in (None, alias):
  474. del self._pending_jobs[i]
  475. jobstore_alias = alias
  476. break
  477. else:
  478. # Otherwise, try to remove it from each store until it succeeds or we run out of
  479. # stores to check
  480. for alias, store in six.iteritems(self._jobstores):
  481. if jobstore in (None, alias):
  482. try:
  483. store.remove_job(job_id)
  484. jobstore_alias = alias
  485. break
  486. except JobLookupError:
  487. continue
  488. if jobstore_alias is None:
  489. raise JobLookupError(job_id)
  490. # Notify listeners that a job has been removed
  491. event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore_alias)
  492. self._dispatch_event(event)
  493. self._logger.info('Removed job %s', job_id)
  494. def remove_all_jobs(self, jobstore=None):
  495. """
  496. Removes all jobs from the specified job store, or all job stores if none is given.
  497. :param str|unicode jobstore: alias of the job store
  498. """
  499. with self._jobstores_lock:
  500. if self.state == STATE_STOPPED:
  501. if jobstore:
  502. self._pending_jobs = [pending for pending in self._pending_jobs if
  503. pending[1] != jobstore]
  504. else:
  505. self._pending_jobs = []
  506. else:
  507. for alias, store in six.iteritems(self._jobstores):
  508. if jobstore in (None, alias):
  509. store.remove_all_jobs()
  510. self._dispatch_event(SchedulerEvent(EVENT_ALL_JOBS_REMOVED, jobstore))
  511. def print_jobs(self, jobstore=None, out=None):
  512. """
  513. print_jobs(jobstore=None, out=sys.stdout)
  514. Prints out a textual listing of all jobs currently scheduled on either all job stores or
  515. just a specific one.
  516. :param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores
  517. :param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is
  518. given)
  519. """
  520. out = out or sys.stdout
  521. with self._jobstores_lock:
  522. if self.state == STATE_STOPPED:
  523. print(u'Pending jobs:', file=out)
  524. if self._pending_jobs:
  525. for job, jobstore_alias, replace_existing in self._pending_jobs:
  526. if jobstore in (None, jobstore_alias):
  527. print(u' %s' % job, file=out)
  528. else:
  529. print(u' No pending jobs', file=out)
  530. else:
  531. for alias, store in sorted(six.iteritems(self._jobstores)):
  532. if jobstore in (None, alias):
  533. print(u'Jobstore %s:' % alias, file=out)
  534. jobs = store.get_all_jobs()
  535. if jobs:
  536. for job in jobs:
  537. print(u' %s' % job, file=out)
  538. else:
  539. print(u' No scheduled jobs', file=out)
  540. @abstractmethod
  541. def wakeup(self):
  542. """
  543. Notifies the scheduler that there may be jobs due for execution.
  544. Triggers :meth:`_process_jobs` to be run in an implementation specific manner.
  545. """
  546. #
  547. # Private API
  548. #
  549. def _configure(self, config):
  550. # Set general options
  551. self._logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler.scheduler')
  552. self.timezone = astimezone(config.pop('timezone', None)) or get_localzone()
  553. # Set the job defaults
  554. job_defaults = config.get('job_defaults', {})
  555. self._job_defaults = {
  556. 'misfire_grace_time': asint(job_defaults.get('misfire_grace_time', 1)),
  557. 'coalesce': asbool(job_defaults.get('coalesce', True)),
  558. 'max_instances': asint(job_defaults.get('max_instances', 1))
  559. }
  560. # Configure executors
  561. self._executors.clear()
  562. for alias, value in six.iteritems(config.get('executors', {})):
  563. if isinstance(value, BaseExecutor):
  564. self.add_executor(value, alias)
  565. elif isinstance(value, MutableMapping):
  566. executor_class = value.pop('class', None)
  567. plugin = value.pop('type', None)
  568. if plugin:
  569. executor = self._create_plugin_instance('executor', plugin, value)
  570. elif executor_class:
  571. cls = maybe_ref(executor_class)
  572. executor = cls(**value)
  573. else:
  574. raise ValueError(
  575. 'Cannot create executor "%s" -- either "type" or "class" must be defined' %
  576. alias)
  577. self.add_executor(executor, alias)
  578. else:
  579. raise TypeError(
  580. "Expected executor instance or dict for executors['%s'], got %s instead" %
  581. (alias, value.__class__.__name__))
  582. # Configure job stores
  583. self._jobstores.clear()
  584. for alias, value in six.iteritems(config.get('jobstores', {})):
  585. if isinstance(value, BaseJobStore):
  586. self.add_jobstore(value, alias)
  587. elif isinstance(value, MutableMapping):
  588. jobstore_class = value.pop('class', None)
  589. plugin = value.pop('type', None)
  590. if plugin:
  591. jobstore = self._create_plugin_instance('jobstore', plugin, value)
  592. elif jobstore_class:
  593. cls = maybe_ref(jobstore_class)
  594. jobstore = cls(**value)
  595. else:
  596. raise ValueError(
  597. 'Cannot create job store "%s" -- either "type" or "class" must be '
  598. 'defined' % alias)
  599. self.add_jobstore(jobstore, alias)
  600. else:
  601. raise TypeError(
  602. "Expected job store instance or dict for jobstores['%s'], got %s instead" %
  603. (alias, value.__class__.__name__))
  604. def _create_default_executor(self):
  605. """Creates a default executor store, specific to the particular scheduler type."""
  606. return ThreadPoolExecutor()
  607. def _create_default_jobstore(self):
  608. """Creates a default job store, specific to the particular scheduler type."""
  609. return MemoryJobStore()
  610. def _lookup_executor(self, alias):
  611. """
  612. Returns the executor instance by the given name from the list of executors that were added
  613. to this scheduler.
  614. :type alias: str
  615. :raises KeyError: if no executor by the given alias is not found
  616. """
  617. try:
  618. return self._executors[alias]
  619. except KeyError:
  620. raise KeyError('No such executor: %s' % alias)
  621. def _lookup_jobstore(self, alias):
  622. """
  623. Returns the job store instance by the given name from the list of job stores that were
  624. added to this scheduler.
  625. :type alias: str
  626. :raises KeyError: if no job store by the given alias is not found
  627. """
  628. try:
  629. return self._jobstores[alias]
  630. except KeyError:
  631. raise KeyError('No such job store: %s' % alias)
  632. def _lookup_job(self, job_id, jobstore_alias):
  633. """
  634. Finds a job by its ID.
  635. :type job_id: str
  636. :param str jobstore_alias: alias of a job store to look in
  637. :return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of
  638. a pending job)
  639. :raises JobLookupError: if no job by the given ID is found.
  640. """
  641. if self.state == STATE_STOPPED:
  642. # Check if the job is among the pending jobs
  643. for job, alias, replace_existing in self._pending_jobs:
  644. if job.id == job_id:
  645. return job, None
  646. else:
  647. # Look in all job stores
  648. for alias, store in six.iteritems(self._jobstores):
  649. if jobstore_alias in (None, alias):
  650. job = store.lookup_job(job_id)
  651. if job is not None:
  652. return job, alias
  653. raise JobLookupError(job_id)
  654. def _dispatch_event(self, event):
  655. """
  656. Dispatches the given event to interested listeners.
  657. :param SchedulerEvent event: the event to send
  658. """
  659. with self._listeners_lock:
  660. listeners = tuple(self._listeners)
  661. for cb, mask in listeners:
  662. if event.code & mask:
  663. try:
  664. cb(event)
  665. except:
  666. self._logger.exception('Error notifying listener')
  667. def _real_add_job(self, job, jobstore_alias, replace_existing):
  668. """
  669. :param Job job: the job to add
  670. :param bool replace_existing: ``True`` to use update_job() in case the job already exists
  671. in the store
  672. """
  673. # Fill in undefined values with defaults
  674. replacements = {}
  675. for key, value in six.iteritems(self._job_defaults):
  676. if not hasattr(job, key):
  677. replacements[key] = value
  678. # Calculate the next run time if there is none defined
  679. if not hasattr(job, 'next_run_time'):
  680. now = datetime.now(self.timezone)
  681. replacements['next_run_time'] = job.trigger.get_next_fire_time(None, now)
  682. # Apply any replacements
  683. job._modify(**replacements)
  684. # Add the job to the given job store
  685. store = self._lookup_jobstore(jobstore_alias)
  686. try:
  687. store.add_job(job)
  688. except ConflictingIdError:
  689. if replace_existing:
  690. store.update_job(job)
  691. else:
  692. raise
  693. # Mark the job as no longer pending
  694. job._jobstore_alias = jobstore_alias
  695. # Notify listeners that a new job has been added
  696. event = JobEvent(EVENT_JOB_ADDED, job.id, jobstore_alias)
  697. self._dispatch_event(event)
  698. self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias)
  699. # Notify the scheduler about the new job
  700. if self.state == STATE_RUNNING:
  701. self.wakeup()
  702. def _create_plugin_instance(self, type_, alias, constructor_kwargs):
  703. """Creates an instance of the given plugin type, loading the plugin first if necessary."""
  704. plugin_container, class_container, base_class = {
  705. 'trigger': (self._trigger_plugins, self._trigger_classes, BaseTrigger),
  706. 'jobstore': (self._jobstore_plugins, self._jobstore_classes, BaseJobStore),
  707. 'executor': (self._executor_plugins, self._executor_classes, BaseExecutor)
  708. }[type_]
  709. try:
  710. plugin_cls = class_container[alias]
  711. except KeyError:
  712. if alias in plugin_container:
  713. plugin_cls = class_container[alias] = plugin_container[alias].load()
  714. if not issubclass(plugin_cls, base_class):
  715. raise TypeError('The {0} entry point does not point to a {0} class'.
  716. format(type_))
  717. else:
  718. raise LookupError('No {0} by the name "{1}" was found'.format(type_, alias))
  719. return plugin_cls(**constructor_kwargs)
  720. def _create_trigger(self, trigger, trigger_args):
  721. if isinstance(trigger, BaseTrigger):
  722. return trigger
  723. elif trigger is None:
  724. trigger = 'date'
  725. elif not isinstance(trigger, six.string_types):
  726. raise TypeError('Expected a trigger instance or string, got %s instead' %
  727. trigger.__class__.__name__)
  728. # Use the scheduler's time zone if nothing else is specified
  729. trigger_args.setdefault('timezone', self.timezone)
  730. # Instantiate the trigger class
  731. return self._create_plugin_instance('trigger', trigger, trigger_args)
  732. def _create_lock(self):
  733. """Creates a reentrant lock object."""
  734. return RLock()
  735. def _process_jobs(self):
  736. """
  737. Iterates through jobs in every jobstore, starts jobs that are due and figures out how long
  738. to wait for the next round.
  739. """
  740. if self.state == STATE_PAUSED:
  741. self._logger.debug('Scheduler is paused -- not processing jobs')
  742. return None
  743. self._logger.debug('Looking for jobs to run')
  744. now = datetime.now(self.timezone)
  745. next_wakeup_time = None
  746. events = []
  747. with self._jobstores_lock:
  748. for jobstore_alias, jobstore in six.iteritems(self._jobstores):
  749. for job in jobstore.get_due_jobs(now):
  750. # Look up the job's executor
  751. try:
  752. executor = self._lookup_executor(job.executor)
  753. except:
  754. self._logger.error(
  755. 'Executor lookup ("%s") failed for job "%s" -- removing it from the '
  756. 'job store', job.executor, job)
  757. self.remove_job(job.id, jobstore_alias)
  758. continue
  759. run_times = job._get_run_times(now)
  760. run_times = run_times[-1:] if run_times and job.coalesce else run_times
  761. if run_times:
  762. try:
  763. executor.submit_job(job, run_times)
  764. except MaxInstancesReachedError:
  765. self._logger.warning(
  766. 'Execution of job "%s" skipped: maximum number of running '
  767. 'instances reached (%d)', job, job.max_instances)
  768. event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id,
  769. jobstore_alias, run_times)
  770. events.append(event)
  771. except:
  772. self._logger.exception('Error submitting job "%s" to executor "%s"',
  773. job, job.executor)
  774. else:
  775. event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias,
  776. run_times)
  777. events.append(event)
  778. # Update the job if it has a next execution time.
  779. # Otherwise remove it from the job store.
  780. job_next_run = job.trigger.get_next_fire_time(run_times[-1], now)
  781. if job_next_run:
  782. job._modify(next_run_time=job_next_run)
  783. jobstore.update_job(job)
  784. else:
  785. self.remove_job(job.id, jobstore_alias)
  786. # Set a new next wakeup time if there isn't one yet or
  787. # the jobstore has an even earlier one
  788. jobstore_next_run_time = jobstore.get_next_run_time()
  789. if jobstore_next_run_time and (next_wakeup_time is None or
  790. jobstore_next_run_time < next_wakeup_time):
  791. next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone)
  792. # Dispatch collected events
  793. for event in events:
  794. self._dispatch_event(event)
  795. # Determine the delay until this method should be called again
  796. if self.state == STATE_PAUSED:
  797. wait_seconds = None
  798. self._logger.debug('Scheduler is paused; waiting until resume() is called')
  799. elif next_wakeup_time is None:
  800. wait_seconds = None
  801. self._logger.debug('No jobs; waiting until a job is added')
  802. else:
  803. wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0)
  804. self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
  805. wait_seconds)
  806. return wait_seconds