test_eventloop.py 971 B

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. """Test eventloop integration"""
  2. import sys
  3. import time
  4. import IPython.testing.decorators as dec
  5. from .utils import flush_channels, start_new_kernel, execute
  6. KC = KM = None
  7. def setup():
  8. """start the global kernel (if it isn't running) and return its client"""
  9. global KM, KC
  10. KM, KC = start_new_kernel()
  11. flush_channels(KC)
  12. def teardown():
  13. KC.stop_channels()
  14. KM.shutdown_kernel(now=True)
  15. async_code = """
  16. from ipykernel.tests._asyncio import async_func
  17. async_func()
  18. """
  19. @dec.skipif(sys.version_info < (3, 5), "async/await syntax required")
  20. def test_asyncio_interrupt():
  21. flush_channels(KC)
  22. msg_id, content = execute('%gui asyncio', KC)
  23. assert content['status'] == 'ok', content
  24. flush_channels(KC)
  25. msg_id, content = execute(async_code, KC)
  26. assert content['status'] == 'ok', content
  27. KM.interrupt_kernel()
  28. flush_channels(KC)
  29. msg_id, content = execute(async_code, KC)
  30. assert content['status'] == 'ok'