pool.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. from abc import abstractmethod
  2. import concurrent.futures
  3. from apscheduler.executors.base import BaseExecutor, run_job
  4. class BasePoolExecutor(BaseExecutor):
  5. @abstractmethod
  6. def __init__(self, pool):
  7. super(BasePoolExecutor, self).__init__()
  8. self._pool = pool
  9. def _do_submit_job(self, job, run_times):
  10. def callback(f):
  11. exc, tb = (f.exception_info() if hasattr(f, 'exception_info') else
  12. (f.exception(), getattr(f.exception(), '__traceback__', None)))
  13. if exc:
  14. self._run_job_error(job.id, exc, tb)
  15. else:
  16. self._run_job_success(job.id, f.result())
  17. f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
  18. f.add_done_callback(callback)
  19. def shutdown(self, wait=True):
  20. self._pool.shutdown(wait)
  21. class ThreadPoolExecutor(BasePoolExecutor):
  22. """
  23. An executor that runs jobs in a concurrent.futures thread pool.
  24. Plugin alias: ``threadpool``
  25. :param max_workers: the maximum number of spawned threads.
  26. """
  27. def __init__(self, max_workers=10):
  28. pool = concurrent.futures.ThreadPoolExecutor(int(max_workers))
  29. super(ThreadPoolExecutor, self).__init__(pool)
  30. class ProcessPoolExecutor(BasePoolExecutor):
  31. """
  32. An executor that runs jobs in a concurrent.futures process pool.
  33. Plugin alias: ``processpool``
  34. :param max_workers: the maximum number of spawned processes.
  35. """
  36. def __init__(self, max_workers=10):
  37. pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
  38. super(ProcessPoolExecutor, self).__init__(pool)