asyncio.py 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. from __future__ import absolute_import
  2. from functools import wraps
  3. from apscheduler.schedulers.base import BaseScheduler
  4. from apscheduler.util import maybe_ref
  5. try:
  6. import asyncio
  7. except ImportError: # pragma: nocover
  8. try:
  9. import trollius as asyncio
  10. except ImportError:
  11. raise ImportError(
  12. 'AsyncIOScheduler requires either Python 3.4 or the asyncio package installed')
  13. def run_in_event_loop(func):
  14. @wraps(func)
  15. def wrapper(self, *args, **kwargs):
  16. self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs)
  17. return wrapper
  18. class AsyncIOScheduler(BaseScheduler):
  19. """
  20. A scheduler that runs on an asyncio (:pep:`3156`) event loop.
  21. Extra options:
  22. ============== =============================================================
  23. ``event_loop`` AsyncIO event loop to use (defaults to the global event loop)
  24. ============== =============================================================
  25. """
  26. _eventloop = None
  27. _timeout = None
  28. @run_in_event_loop
  29. def shutdown(self, wait=True):
  30. super(AsyncIOScheduler, self).shutdown(wait)
  31. self._stop_timer()
  32. def _configure(self, config):
  33. self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop()
  34. super(AsyncIOScheduler, self)._configure(config)
  35. def _start_timer(self, wait_seconds):
  36. self._stop_timer()
  37. if wait_seconds is not None:
  38. self._timeout = self._eventloop.call_later(wait_seconds, self.wakeup)
  39. def _stop_timer(self):
  40. if self._timeout:
  41. self._timeout.cancel()
  42. del self._timeout
  43. @run_in_event_loop
  44. def wakeup(self):
  45. self._stop_timer()
  46. wait_seconds = self._process_jobs()
  47. self._start_timer(wait_seconds)
  48. def _create_default_executor(self):
  49. from apscheduler.executors.asyncio import AsyncIOExecutor
  50. return AsyncIOExecutor()