debug.py 559 B

1234567891011121314151617181920
  1. import sys
  2. from apscheduler.executors.base import BaseExecutor, run_job
  3. class DebugExecutor(BaseExecutor):
  4. """
  5. A special executor that executes the target callable directly instead of deferring it to a
  6. thread or process.
  7. Plugin alias: ``debug``
  8. """
  9. def _do_submit_job(self, job, run_times):
  10. try:
  11. events = run_job(job, job._jobstore_alias, run_times, self._logger.name)
  12. except:
  13. self._run_job_error(job.id, *sys.exc_info()[1:])
  14. else:
  15. self._run_job_success(job.id, events)