job.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. from collections import Iterable, Mapping
  2. from uuid import uuid4
  3. import six
  4. from apscheduler.triggers.base import BaseTrigger
  5. from apscheduler.util import (
  6. ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args,
  7. convert_to_datetime)
  8. class Job(object):
  9. """
  10. Contains the options given when scheduling callables and its current schedule and other state.
  11. This class should never be instantiated by the user.
  12. :var str id: the unique identifier of this job
  13. :var str name: the description of this job
  14. :var func: the callable to execute
  15. :var tuple|list args: positional arguments to the callable
  16. :var dict kwargs: keyword arguments to the callable
  17. :var bool coalesce: whether to only run the job once when several run times are due
  18. :var trigger: the trigger object that controls the schedule of this job
  19. :var str executor: the name of the executor that will run this job
  20. :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to
  21. be late
  22. :var int max_instances: the maximum number of concurrently executing instances allowed for this
  23. job
  24. :var datetime.datetime next_run_time: the next scheduled run time of this job
  25. .. note::
  26. The ``misfire_grace_time`` has some non-obvious effects on job execution. See the
  27. :ref:`missed-job-executions` section in the documentation for an in-depth explanation.
  28. """
  29. __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
  30. 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
  31. 'next_run_time')
  32. def __init__(self, scheduler, id=None, **kwargs):
  33. super(Job, self).__init__()
  34. self._scheduler = scheduler
  35. self._jobstore_alias = None
  36. self._modify(id=id or uuid4().hex, **kwargs)
  37. def modify(self, **changes):
  38. """
  39. Makes the given changes to this job and saves it in the associated job store.
  40. Accepted keyword arguments are the same as the variables on this class.
  41. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.modify_job`
  42. :return Job: this job instance
  43. """
  44. self._scheduler.modify_job(self.id, self._jobstore_alias, **changes)
  45. return self
  46. def reschedule(self, trigger, **trigger_args):
  47. """
  48. Shortcut for switching the trigger on this job.
  49. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.reschedule_job`
  50. :return Job: this job instance
  51. """
  52. self._scheduler.reschedule_job(self.id, self._jobstore_alias, trigger, **trigger_args)
  53. return self
  54. def pause(self):
  55. """
  56. Temporarily suspend the execution of this job.
  57. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job`
  58. :return Job: this job instance
  59. """
  60. self._scheduler.pause_job(self.id, self._jobstore_alias)
  61. return self
  62. def resume(self):
  63. """
  64. Resume the schedule of this job if previously paused.
  65. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.resume_job`
  66. :return Job: this job instance
  67. """
  68. self._scheduler.resume_job(self.id, self._jobstore_alias)
  69. return self
  70. def remove(self):
  71. """
  72. Unschedules this job and removes it from its associated job store.
  73. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.remove_job`
  74. """
  75. self._scheduler.remove_job(self.id, self._jobstore_alias)
  76. @property
  77. def pending(self):
  78. """
  79. Returns ``True`` if the referenced job is still waiting to be added to its designated job
  80. store.
  81. """
  82. return self._jobstore_alias is None
  83. #
  84. # Private API
  85. #
  86. def _get_run_times(self, now):
  87. """
  88. Computes the scheduled run times between ``next_run_time`` and ``now`` (inclusive).
  89. :type now: datetime.datetime
  90. :rtype: list[datetime.datetime]
  91. """
  92. run_times = []
  93. next_run_time = self.next_run_time
  94. while next_run_time and next_run_time <= now:
  95. run_times.append(next_run_time)
  96. next_run_time = self.trigger.get_next_fire_time(next_run_time, now)
  97. return run_times
  98. def _modify(self, **changes):
  99. """
  100. Validates the changes to the Job and makes the modifications if and only if all of them
  101. validate.
  102. """
  103. approved = {}
  104. if 'id' in changes:
  105. value = changes.pop('id')
  106. if not isinstance(value, six.string_types):
  107. raise TypeError("id must be a nonempty string")
  108. if hasattr(self, 'id'):
  109. raise ValueError('The job ID may not be changed')
  110. approved['id'] = value
  111. if 'func' in changes or 'args' in changes or 'kwargs' in changes:
  112. func = changes.pop('func') if 'func' in changes else self.func
  113. args = changes.pop('args') if 'args' in changes else self.args
  114. kwargs = changes.pop('kwargs') if 'kwargs' in changes else self.kwargs
  115. if isinstance(func, six.string_types):
  116. func_ref = func
  117. func = ref_to_obj(func)
  118. elif callable(func):
  119. try:
  120. func_ref = obj_to_ref(func)
  121. except ValueError:
  122. # If this happens, this Job won't be serializable
  123. func_ref = None
  124. else:
  125. raise TypeError('func must be a callable or a textual reference to one')
  126. if not hasattr(self, 'name') and changes.get('name', None) is None:
  127. changes['name'] = get_callable_name(func)
  128. if isinstance(args, six.string_types) or not isinstance(args, Iterable):
  129. raise TypeError('args must be a non-string iterable')
  130. if isinstance(kwargs, six.string_types) or not isinstance(kwargs, Mapping):
  131. raise TypeError('kwargs must be a dict-like object')
  132. check_callable_args(func, args, kwargs)
  133. approved['func'] = func
  134. approved['func_ref'] = func_ref
  135. approved['args'] = args
  136. approved['kwargs'] = kwargs
  137. if 'name' in changes:
  138. value = changes.pop('name')
  139. if not value or not isinstance(value, six.string_types):
  140. raise TypeError("name must be a nonempty string")
  141. approved['name'] = value
  142. if 'misfire_grace_time' in changes:
  143. value = changes.pop('misfire_grace_time')
  144. if value is not None and (not isinstance(value, six.integer_types) or value <= 0):
  145. raise TypeError('misfire_grace_time must be either None or a positive integer')
  146. approved['misfire_grace_time'] = value
  147. if 'coalesce' in changes:
  148. value = bool(changes.pop('coalesce'))
  149. approved['coalesce'] = value
  150. if 'max_instances' in changes:
  151. value = changes.pop('max_instances')
  152. if not isinstance(value, six.integer_types) or value <= 0:
  153. raise TypeError('max_instances must be a positive integer')
  154. approved['max_instances'] = value
  155. if 'trigger' in changes:
  156. trigger = changes.pop('trigger')
  157. if not isinstance(trigger, BaseTrigger):
  158. raise TypeError('Expected a trigger instance, got %s instead' %
  159. trigger.__class__.__name__)
  160. approved['trigger'] = trigger
  161. if 'executor' in changes:
  162. value = changes.pop('executor')
  163. if not isinstance(value, six.string_types):
  164. raise TypeError('executor must be a string')
  165. approved['executor'] = value
  166. if 'next_run_time' in changes:
  167. value = changes.pop('next_run_time')
  168. approved['next_run_time'] = convert_to_datetime(value, self._scheduler.timezone,
  169. 'next_run_time')
  170. if changes:
  171. raise AttributeError('The following are not modifiable attributes of Job: %s' %
  172. ', '.join(changes))
  173. for key, value in six.iteritems(approved):
  174. setattr(self, key, value)
  175. def __getstate__(self):
  176. # Don't allow this Job to be serialized if the function reference could not be determined
  177. if not self.func_ref:
  178. raise ValueError(
  179. 'This Job cannot be serialized since the reference to its callable (%r) could not '
  180. 'be determined. Consider giving a textual reference (module:function name) '
  181. 'instead.' % (self.func,))
  182. return {
  183. 'version': 1,
  184. 'id': self.id,
  185. 'func': self.func_ref,
  186. 'trigger': self.trigger,
  187. 'executor': self.executor,
  188. 'args': self.args,
  189. 'kwargs': self.kwargs,
  190. 'name': self.name,
  191. 'misfire_grace_time': self.misfire_grace_time,
  192. 'coalesce': self.coalesce,
  193. 'max_instances': self.max_instances,
  194. 'next_run_time': self.next_run_time
  195. }
  196. def __setstate__(self, state):
  197. if state.get('version', 1) > 1:
  198. raise ValueError('Job has version %s, but only version 1 can be handled' %
  199. state['version'])
  200. self.id = state['id']
  201. self.func_ref = state['func']
  202. self.func = ref_to_obj(self.func_ref)
  203. self.trigger = state['trigger']
  204. self.executor = state['executor']
  205. self.args = state['args']
  206. self.kwargs = state['kwargs']
  207. self.name = state['name']
  208. self.misfire_grace_time = state['misfire_grace_time']
  209. self.coalesce = state['coalesce']
  210. self.max_instances = state['max_instances']
  211. self.next_run_time = state['next_run_time']
  212. def __eq__(self, other):
  213. if isinstance(other, Job):
  214. return self.id == other.id
  215. return NotImplemented
  216. def __repr__(self):
  217. return '<Job (id=%s name=%s)>' % (repr_escape(self.id), repr_escape(self.name))
  218. def __str__(self):
  219. return repr_escape(self.__unicode__())
  220. def __unicode__(self):
  221. if hasattr(self, 'next_run_time'):
  222. status = ('next run at: ' + datetime_repr(self.next_run_time) if
  223. self.next_run_time else 'paused')
  224. else:
  225. status = 'pending'
  226. return u'%s (trigger: %s, %s)' % (self.name, self.trigger, status)