twisted.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. from __future__ import absolute_import
  2. from functools import wraps
  3. from apscheduler.schedulers.base import BaseScheduler
  4. from apscheduler.util import maybe_ref
  5. try:
  6. from twisted.internet import reactor as default_reactor
  7. except ImportError: # pragma: nocover
  8. raise ImportError('TwistedScheduler requires Twisted installed')
  9. def run_in_reactor(func):
  10. @wraps(func)
  11. def wrapper(self, *args, **kwargs):
  12. self._reactor.callFromThread(func, self, *args, **kwargs)
  13. return wrapper
  14. class TwistedScheduler(BaseScheduler):
  15. """
  16. A scheduler that runs on a Twisted reactor.
  17. Extra options:
  18. =========== ========================================================
  19. ``reactor`` Reactor instance to use (defaults to the global reactor)
  20. =========== ========================================================
  21. """
  22. _reactor = None
  23. _delayedcall = None
  24. def _configure(self, config):
  25. self._reactor = maybe_ref(config.pop('reactor', default_reactor))
  26. super(TwistedScheduler, self)._configure(config)
  27. @run_in_reactor
  28. def shutdown(self, wait=True):
  29. super(TwistedScheduler, self).shutdown(wait)
  30. self._stop_timer()
  31. def _start_timer(self, wait_seconds):
  32. self._stop_timer()
  33. if wait_seconds is not None:
  34. self._delayedcall = self._reactor.callLater(wait_seconds, self.wakeup)
  35. def _stop_timer(self):
  36. if self._delayedcall and self._delayedcall.active():
  37. self._delayedcall.cancel()
  38. del self._delayedcall
  39. @run_in_reactor
  40. def wakeup(self):
  41. self._stop_timer()
  42. wait_seconds = self._process_jobs()
  43. self._start_timer(wait_seconds)
  44. def _create_default_executor(self):
  45. from apscheduler.executors.twisted import TwistedExecutor
  46. return TwistedExecutor()