rethinkdb.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. from __future__ import absolute_import
  2. from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
  3. from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
  4. from apscheduler.job import Job
  5. try:
  6. import cPickle as pickle
  7. except ImportError: # pragma: nocover
  8. import pickle
  9. try:
  10. import rethinkdb as r
  11. except ImportError: # pragma: nocover
  12. raise ImportError('RethinkDBJobStore requires rethinkdb installed')
  13. class RethinkDBJobStore(BaseJobStore):
  14. """
  15. Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
  16. rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
  17. Plugin alias: ``rethinkdb``
  18. :param str database: database to store jobs in
  19. :param str collection: collection to store jobs in
  20. :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
  21. connection arguments
  22. :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
  23. highest available
  24. """
  25. def __init__(self, database='apscheduler', table='jobs', client=None,
  26. pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
  27. super(RethinkDBJobStore, self).__init__()
  28. if not database:
  29. raise ValueError('The "database" parameter must not be empty')
  30. if not table:
  31. raise ValueError('The "table" parameter must not be empty')
  32. self.database = database
  33. self.table = table
  34. self.client = client
  35. self.pickle_protocol = pickle_protocol
  36. self.connect_args = connect_args
  37. self.conn = None
  38. def start(self, scheduler, alias):
  39. super(RethinkDBJobStore, self).start(scheduler, alias)
  40. if self.client:
  41. self.conn = maybe_ref(self.client)
  42. else:
  43. self.conn = r.connect(db=self.database, **self.connect_args)
  44. if self.database not in r.db_list().run(self.conn):
  45. r.db_create(self.database).run(self.conn)
  46. if self.table not in r.table_list().run(self.conn):
  47. r.table_create(self.table).run(self.conn)
  48. if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
  49. r.table(self.table).index_create('next_run_time').run(self.conn)
  50. self.table = r.db(self.database).table(self.table)
  51. def lookup_job(self, job_id):
  52. results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
  53. return self._reconstitute_job(results[0]['job_state']) if results else None
  54. def get_due_jobs(self, now):
  55. return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
  56. def get_next_run_time(self):
  57. results = list(
  58. self.table
  59. .filter(r.row['next_run_time'] != None) # flake8: noqa
  60. .order_by(r.asc('next_run_time'))
  61. .map(lambda x: x['next_run_time'])
  62. .limit(1)
  63. .run(self.conn)
  64. )
  65. return utc_timestamp_to_datetime(results[0]) if results else None
  66. def get_all_jobs(self):
  67. jobs = self._get_jobs()
  68. self._fix_paused_jobs_sorting(jobs)
  69. return jobs
  70. def add_job(self, job):
  71. job_dict = {
  72. 'id': job.id,
  73. 'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
  74. 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
  75. }
  76. results = self.table.insert(job_dict).run(self.conn)
  77. if results['errors'] > 0:
  78. raise ConflictingIdError(job.id)
  79. def update_job(self, job):
  80. changes = {
  81. 'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
  82. 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
  83. }
  84. results = self.table.get_all(job.id).update(changes).run(self.conn)
  85. skipped = False in map(lambda x: results[x] == 0, results.keys())
  86. if results['skipped'] > 0 or results['errors'] > 0 or not skipped:
  87. raise JobLookupError(job.id)
  88. def remove_job(self, job_id):
  89. results = self.table.get_all(job_id).delete().run(self.conn)
  90. if results['deleted'] + results['skipped'] != 1:
  91. raise JobLookupError(job_id)
  92. def remove_all_jobs(self):
  93. self.table.delete().run(self.conn)
  94. def shutdown(self):
  95. self.conn.close()
  96. def _reconstitute_job(self, job_state):
  97. job_state = pickle.loads(job_state)
  98. job = Job.__new__(Job)
  99. job.__setstate__(job_state)
  100. job._scheduler = self._scheduler
  101. job._jobstore_alias = self._alias
  102. return job
  103. def _get_jobs(self, predicate=None):
  104. jobs = []
  105. failed_job_ids = []
  106. query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if
  107. predicate else self.table)
  108. query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
  109. for document in query.run(self.conn):
  110. try:
  111. jobs.append(self._reconstitute_job(document['job_state']))
  112. except:
  113. self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
  114. failed_job_ids.append(document['id'])
  115. # Remove all the jobs we failed to restore
  116. if failed_job_ids:
  117. r.expr(failed_job_ids).for_each(
  118. lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
  119. return jobs
  120. def __repr__(self):
  121. connection = self.conn
  122. return '<%s (connection=%s)>' % (self.__class__.__name__, connection)