memory.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. from __future__ import absolute_import
  2. from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
  3. from apscheduler.util import datetime_to_utc_timestamp
  4. class MemoryJobStore(BaseJobStore):
  5. """
  6. Stores jobs in an array in RAM. Provides no persistence support.
  7. Plugin alias: ``memory``
  8. """
  9. def __init__(self):
  10. super(MemoryJobStore, self).__init__()
  11. # list of (job, timestamp), sorted by next_run_time and job id (ascending)
  12. self._jobs = []
  13. self._jobs_index = {} # id -> (job, timestamp) lookup table
  14. def lookup_job(self, job_id):
  15. return self._jobs_index.get(job_id, (None, None))[0]
  16. def get_due_jobs(self, now):
  17. now_timestamp = datetime_to_utc_timestamp(now)
  18. pending = []
  19. for job, timestamp in self._jobs:
  20. if timestamp is None or timestamp > now_timestamp:
  21. break
  22. pending.append(job)
  23. return pending
  24. def get_next_run_time(self):
  25. return self._jobs[0][0].next_run_time if self._jobs else None
  26. def get_all_jobs(self):
  27. return [j[0] for j in self._jobs]
  28. def add_job(self, job):
  29. if job.id in self._jobs_index:
  30. raise ConflictingIdError(job.id)
  31. timestamp = datetime_to_utc_timestamp(job.next_run_time)
  32. index = self._get_job_index(timestamp, job.id)
  33. self._jobs.insert(index, (job, timestamp))
  34. self._jobs_index[job.id] = (job, timestamp)
  35. def update_job(self, job):
  36. old_job, old_timestamp = self._jobs_index.get(job.id, (None, None))
  37. if old_job is None:
  38. raise JobLookupError(job.id)
  39. # If the next run time has not changed, simply replace the job in its present index.
  40. # Otherwise, reinsert the job to the list to preserve the ordering.
  41. old_index = self._get_job_index(old_timestamp, old_job.id)
  42. new_timestamp = datetime_to_utc_timestamp(job.next_run_time)
  43. if old_timestamp == new_timestamp:
  44. self._jobs[old_index] = (job, new_timestamp)
  45. else:
  46. del self._jobs[old_index]
  47. new_index = self._get_job_index(new_timestamp, job.id)
  48. self._jobs.insert(new_index, (job, new_timestamp))
  49. self._jobs_index[old_job.id] = (job, new_timestamp)
  50. def remove_job(self, job_id):
  51. job, timestamp = self._jobs_index.get(job_id, (None, None))
  52. if job is None:
  53. raise JobLookupError(job_id)
  54. index = self._get_job_index(timestamp, job_id)
  55. del self._jobs[index]
  56. del self._jobs_index[job.id]
  57. def remove_all_jobs(self):
  58. self._jobs = []
  59. self._jobs_index = {}
  60. def shutdown(self):
  61. self.remove_all_jobs()
  62. def _get_job_index(self, timestamp, job_id):
  63. """
  64. Returns the index of the given job, or if it's not found, the index where the job should be
  65. inserted based on the given timestamp.
  66. :type timestamp: int
  67. :type job_id: str
  68. """
  69. lo, hi = 0, len(self._jobs)
  70. timestamp = float('inf') if timestamp is None else timestamp
  71. while lo < hi:
  72. mid = (lo + hi) // 2
  73. mid_job, mid_timestamp = self._jobs[mid]
  74. mid_timestamp = float('inf') if mid_timestamp is None else mid_timestamp
  75. if mid_timestamp > timestamp:
  76. hi = mid
  77. elif mid_timestamp < timestamp:
  78. lo = mid + 1
  79. elif mid_job.id > job_id:
  80. hi = mid
  81. elif mid_job.id < job_id:
  82. lo = mid + 1
  83. else:
  84. return mid
  85. return lo