test_multikernelmanager.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. """Tests for the notebook kernel and session manager."""
  2. import os
  3. import time
  4. import threading
  5. import multiprocessing as mp
  6. from subprocess import PIPE
  7. from unittest import TestCase
  8. from traitlets.config.loader import Config
  9. from jupyter_client import KernelManager
  10. from jupyter_client.multikernelmanager import MultiKernelManager
  11. from .utils import skip_win32
  12. from ..localinterfaces import localhost
  13. TIMEOUT = 30
  14. class TestKernelManager(TestCase):
  15. def _get_tcp_km(self):
  16. c = Config()
  17. km = MultiKernelManager(config=c)
  18. return km
  19. def _get_ipc_km(self):
  20. c = Config()
  21. c.KernelManager.transport = 'ipc'
  22. c.KernelManager.ip = 'test'
  23. km = MultiKernelManager(config=c)
  24. return km
  25. def _run_lifecycle(self, km):
  26. kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
  27. self.assertTrue(km.is_alive(kid))
  28. self.assertTrue(kid in km)
  29. self.assertTrue(kid in km.list_kernel_ids())
  30. self.assertEqual(len(km),1)
  31. km.restart_kernel(kid, now=True)
  32. self.assertTrue(km.is_alive(kid))
  33. self.assertTrue(kid in km.list_kernel_ids())
  34. km.interrupt_kernel(kid)
  35. k = km.get_kernel(kid)
  36. self.assertTrue(isinstance(k, KernelManager))
  37. km.shutdown_kernel(kid, now=True)
  38. self.assertTrue(not kid in km)
  39. def _run_cinfo(self, km, transport, ip):
  40. kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
  41. k = km.get_kernel(kid)
  42. cinfo = km.get_connection_info(kid)
  43. self.assertEqual(transport, cinfo['transport'])
  44. self.assertEqual(ip, cinfo['ip'])
  45. self.assertTrue('stdin_port' in cinfo)
  46. self.assertTrue('iopub_port' in cinfo)
  47. stream = km.connect_iopub(kid)
  48. stream.close()
  49. self.assertTrue('shell_port' in cinfo)
  50. stream = km.connect_shell(kid)
  51. stream.close()
  52. self.assertTrue('hb_port' in cinfo)
  53. stream = km.connect_hb(kid)
  54. stream.close()
  55. km.shutdown_kernel(kid, now=True)
  56. def test_tcp_lifecycle(self):
  57. km = self._get_tcp_km()
  58. self._run_lifecycle(km)
  59. def test_shutdown_all(self):
  60. km = self._get_tcp_km()
  61. kid = km.start_kernel(stdout=PIPE, stderr=PIPE)
  62. self.assertIn(kid, km)
  63. km.shutdown_all()
  64. self.assertNotIn(kid, km)
  65. # shutdown again is okay, because we have no kernels
  66. km.shutdown_all()
  67. def test_tcp_cinfo(self):
  68. km = self._get_tcp_km()
  69. self._run_cinfo(km, 'tcp', localhost())
  70. @skip_win32
  71. def test_ipc_lifecycle(self):
  72. km = self._get_ipc_km()
  73. self._run_lifecycle(km)
  74. @skip_win32
  75. def test_ipc_cinfo(self):
  76. km = self._get_ipc_km()
  77. self._run_cinfo(km, 'ipc', 'test')
  78. def test_start_sequence_tcp_kernels(self):
  79. """Ensure that a sequence of kernel startups doesn't break anything."""
  80. self._run_lifecycle(self._get_tcp_km())
  81. self._run_lifecycle(self._get_tcp_km())
  82. self._run_lifecycle(self._get_tcp_km())
  83. def test_start_sequence_tcp_kernels(self):
  84. """Ensure that a sequence of kernel startups doesn't break anything."""
  85. self._run_lifecycle(self._get_ipc_km())
  86. self._run_lifecycle(self._get_ipc_km())
  87. self._run_lifecycle(self._get_ipc_km())
  88. def test_start_parallel_thread_kernels(self):
  89. self.test_tcp_lifecycle()
  90. thread = threading.Thread(target=self.test_tcp_lifecycle)
  91. thread2 = threading.Thread(target=self.test_tcp_lifecycle)
  92. try:
  93. thread.start()
  94. thread2.start()
  95. finally:
  96. thread.join()
  97. thread2.join()
  98. def test_start_parallel_process_kernels(self):
  99. self.test_tcp_lifecycle()
  100. thread = threading.Thread(target=self.test_tcp_lifecycle)
  101. proc = mp.Process(target=self.test_tcp_lifecycle)
  102. try:
  103. thread.start()
  104. proc.start()
  105. finally:
  106. thread.join()
  107. proc.join()
  108. assert proc.exitcode == 0