test_kernelmanager.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. """Tests for the KernelManager"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import json
  5. import os
  6. pjoin = os.path.join
  7. import signal
  8. from subprocess import PIPE
  9. import sys
  10. import time
  11. import threading
  12. import multiprocessing as mp
  13. import pytest
  14. from unittest import TestCase
  15. from traitlets.config.loader import Config
  16. from jupyter_core import paths
  17. from jupyter_client import KernelManager
  18. from ..manager import start_new_kernel
  19. from .utils import test_env, skip_win32
  20. TIMEOUT = 30
  21. class TestKernelManager(TestCase):
  22. def setUp(self):
  23. self.env_patch = test_env()
  24. self.env_patch.start()
  25. def tearDown(self):
  26. self.env_patch.stop()
  27. def _install_test_kernel(self):
  28. kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
  29. os.makedirs(kernel_dir)
  30. with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
  31. f.write(json.dumps({
  32. 'argv': [sys.executable,
  33. '-m', 'jupyter_client.tests.signalkernel',
  34. '-f', '{connection_file}'],
  35. 'display_name': "Signal Test Kernel",
  36. }))
  37. def _get_tcp_km(self):
  38. c = Config()
  39. km = KernelManager(config=c)
  40. return km
  41. def _get_ipc_km(self):
  42. c = Config()
  43. c.KernelManager.transport = 'ipc'
  44. c.KernelManager.ip = 'test'
  45. km = KernelManager(config=c)
  46. return km
  47. def _run_lifecycle(self, km):
  48. km.start_kernel(stdout=PIPE, stderr=PIPE)
  49. self.assertTrue(km.is_alive())
  50. km.restart_kernel(now=True)
  51. self.assertTrue(km.is_alive())
  52. km.interrupt_kernel()
  53. self.assertTrue(isinstance(km, KernelManager))
  54. km.shutdown_kernel(now=True)
  55. def test_tcp_lifecycle(self):
  56. km = self._get_tcp_km()
  57. self._run_lifecycle(km)
  58. @skip_win32
  59. def test_ipc_lifecycle(self):
  60. km = self._get_ipc_km()
  61. self._run_lifecycle(km)
  62. def test_get_connect_info(self):
  63. km = self._get_tcp_km()
  64. cinfo = km.get_connection_info()
  65. keys = sorted(cinfo.keys())
  66. expected = sorted([
  67. 'ip', 'transport',
  68. 'hb_port', 'shell_port', 'stdin_port', 'iopub_port', 'control_port',
  69. 'key', 'signature_scheme',
  70. ])
  71. self.assertEqual(keys, expected)
  72. @skip_win32
  73. def test_signal_kernel_subprocesses(self):
  74. self._install_test_kernel()
  75. km, kc = start_new_kernel(kernel_name='signaltest')
  76. def execute(cmd):
  77. kc.execute(cmd)
  78. reply = kc.get_shell_msg(TIMEOUT)
  79. content = reply['content']
  80. self.assertEqual(content['status'], 'ok')
  81. return content
  82. self.addCleanup(kc.stop_channels)
  83. self.addCleanup(km.shutdown_kernel)
  84. N = 5
  85. for i in range(N):
  86. execute("start")
  87. time.sleep(1) # make sure subprocs stay up
  88. reply = execute('check')
  89. self.assertEqual(reply['user_expressions']['poll'], [None] * N)
  90. # start a job on the kernel to be interrupted
  91. kc.execute('sleep')
  92. time.sleep(1) # ensure sleep message has been handled before we interrupt
  93. km.interrupt_kernel()
  94. reply = kc.get_shell_msg(TIMEOUT)
  95. content = reply['content']
  96. self.assertEqual(content['status'], 'ok')
  97. self.assertEqual(content['user_expressions']['interrupted'], True)
  98. # wait up to 5s for subprocesses to handle signal
  99. for i in range(50):
  100. reply = execute('check')
  101. if reply['user_expressions']['poll'] != [-signal.SIGINT] * N:
  102. time.sleep(0.1)
  103. else:
  104. break
  105. # verify that subprocesses were interrupted
  106. self.assertEqual(reply['user_expressions']['poll'], [-signal.SIGINT] * N)
  107. def test_start_new_kernel(self):
  108. self._install_test_kernel()
  109. km, kc = start_new_kernel(kernel_name='signaltest')
  110. self.addCleanup(kc.stop_channels)
  111. self.addCleanup(km.shutdown_kernel)
  112. self.assertTrue(km.is_alive())
  113. self.assertTrue(kc.is_alive())
  114. @pytest.mark.parallel
  115. class TestParallel:
  116. @pytest.fixture(autouse=True)
  117. def env(self):
  118. env_patch = test_env()
  119. env_patch.start()
  120. yield
  121. env_patch.stop()
  122. @pytest.fixture(params=['tcp', 'ipc'])
  123. def transport(self, request):
  124. return request.param
  125. @pytest.fixture
  126. def config(self, transport):
  127. c = Config()
  128. c.transport = transport
  129. if transport == 'ipc':
  130. c.ip = 'test'
  131. return c
  132. def _install_test_kernel(self):
  133. kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
  134. os.makedirs(kernel_dir)
  135. with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
  136. f.write(json.dumps({
  137. 'argv': [sys.executable,
  138. '-m', 'jupyter_client.tests.signalkernel',
  139. '-f', '{connection_file}'],
  140. 'display_name': "Signal Test Kernel",
  141. }))
  142. def test_start_sequence_kernels(self, config):
  143. """Ensure that a sequence of kernel startups doesn't break anything."""
  144. self._install_test_kernel()
  145. self._run_signaltest_lifecycle(config)
  146. self._run_signaltest_lifecycle(config)
  147. self._run_signaltest_lifecycle(config)
  148. def test_start_parallel_thread_kernels(self, config):
  149. self._install_test_kernel()
  150. self._run_signaltest_lifecycle(config)
  151. thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
  152. thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
  153. try:
  154. thread.start()
  155. thread2.start()
  156. finally:
  157. thread.join()
  158. thread2.join()
  159. def test_start_parallel_process_kernels(self, config):
  160. self._install_test_kernel()
  161. self._run_signaltest_lifecycle(config)
  162. thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
  163. proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
  164. try:
  165. thread.start()
  166. proc.start()
  167. finally:
  168. thread.join()
  169. proc.join()
  170. assert proc.exitcode == 0
  171. def test_start_sequence_process_kernels(self, config):
  172. self._install_test_kernel()
  173. self._run_signaltest_lifecycle(config)
  174. proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
  175. try:
  176. proc.start()
  177. finally:
  178. proc.join()
  179. assert proc.exitcode == 0
  180. def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
  181. km.start_kernel(**kwargs)
  182. kc = km.client()
  183. kc.start_channels()
  184. try:
  185. kc.wait_for_ready(timeout=startup_timeout)
  186. except RuntimeError:
  187. kc.stop_channels()
  188. km.shutdown_kernel()
  189. raise
  190. return kc
  191. def _run_signaltest_lifecycle(self, config=None):
  192. km = KernelManager(config=config, kernel_name='signaltest')
  193. kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)
  194. def execute(cmd):
  195. kc.execute(cmd)
  196. reply = kc.get_shell_msg(TIMEOUT)
  197. content = reply['content']
  198. assert content['status'] == 'ok'
  199. return content
  200. execute("start")
  201. assert km.is_alive()
  202. execute('check')
  203. assert km.is_alive()
  204. km.restart_kernel(now=True)
  205. assert km.is_alive()
  206. execute('check')
  207. km.shutdown_kernel()