tornado.py 1.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. from __future__ import absolute_import
  2. from datetime import timedelta
  3. from functools import wraps
  4. from apscheduler.schedulers.base import BaseScheduler
  5. from apscheduler.util import maybe_ref
  6. try:
  7. from tornado.ioloop import IOLoop
  8. except ImportError: # pragma: nocover
  9. raise ImportError('TornadoScheduler requires tornado installed')
  10. def run_in_ioloop(func):
  11. @wraps(func)
  12. def wrapper(self, *args, **kwargs):
  13. self._ioloop.add_callback(func, self, *args, **kwargs)
  14. return wrapper
  15. class TornadoScheduler(BaseScheduler):
  16. """
  17. A scheduler that runs on a Tornado IOLoop.
  18. =========== ===============================================================
  19. ``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop)
  20. =========== ===============================================================
  21. """
  22. _ioloop = None
  23. _timeout = None
  24. @run_in_ioloop
  25. def shutdown(self, wait=True):
  26. super(TornadoScheduler, self).shutdown(wait)
  27. self._stop_timer()
  28. def _configure(self, config):
  29. self._ioloop = maybe_ref(config.pop('io_loop', None)) or IOLoop.current()
  30. super(TornadoScheduler, self)._configure(config)
  31. def _start_timer(self, wait_seconds):
  32. self._stop_timer()
  33. if wait_seconds is not None:
  34. self._timeout = self._ioloop.add_timeout(timedelta(seconds=wait_seconds), self.wakeup)
  35. def _stop_timer(self):
  36. if self._timeout:
  37. self._ioloop.remove_timeout(self._timeout)
  38. del self._timeout
  39. @run_in_ioloop
  40. def wakeup(self):
  41. self._stop_timer()
  42. wait_seconds = self._process_jobs()
  43. self._start_timer(wait_seconds)