123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- """Tests for the KernelManager"""
- # Copyright (c) Jupyter Development Team.
- # Distributed under the terms of the Modified BSD License.
- import json
- import os
- pjoin = os.path.join
- import signal
- from subprocess import PIPE
- import sys
- import time
- import threading
- import multiprocessing as mp
- import pytest
- from unittest import TestCase
- from traitlets.config.loader import Config
- from jupyter_core import paths
- from jupyter_client import KernelManager
- from ..manager import start_new_kernel
- from .utils import test_env, skip_win32
- TIMEOUT = 30
- class TestKernelManager(TestCase):
- def setUp(self):
- self.env_patch = test_env()
- self.env_patch.start()
-
- def tearDown(self):
- self.env_patch.stop()
- def _install_test_kernel(self):
- kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
- os.makedirs(kernel_dir)
- with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
- f.write(json.dumps({
- 'argv': [sys.executable,
- '-m', 'jupyter_client.tests.signalkernel',
- '-f', '{connection_file}'],
- 'display_name': "Signal Test Kernel",
- }))
- def _get_tcp_km(self):
- c = Config()
- km = KernelManager(config=c)
- return km
- def _get_ipc_km(self):
- c = Config()
- c.KernelManager.transport = 'ipc'
- c.KernelManager.ip = 'test'
- km = KernelManager(config=c)
- return km
- def _run_lifecycle(self, km):
- km.start_kernel(stdout=PIPE, stderr=PIPE)
- self.assertTrue(km.is_alive())
- km.restart_kernel(now=True)
- self.assertTrue(km.is_alive())
- km.interrupt_kernel()
- self.assertTrue(isinstance(km, KernelManager))
- km.shutdown_kernel(now=True)
- def test_tcp_lifecycle(self):
- km = self._get_tcp_km()
- self._run_lifecycle(km)
- @skip_win32
- def test_ipc_lifecycle(self):
- km = self._get_ipc_km()
- self._run_lifecycle(km)
- def test_get_connect_info(self):
- km = self._get_tcp_km()
- cinfo = km.get_connection_info()
- keys = sorted(cinfo.keys())
- expected = sorted([
- 'ip', 'transport',
- 'hb_port', 'shell_port', 'stdin_port', 'iopub_port', 'control_port',
- 'key', 'signature_scheme',
- ])
- self.assertEqual(keys, expected)
- @skip_win32
- def test_signal_kernel_subprocesses(self):
- self._install_test_kernel()
- km, kc = start_new_kernel(kernel_name='signaltest')
- def execute(cmd):
- kc.execute(cmd)
- reply = kc.get_shell_msg(TIMEOUT)
- content = reply['content']
- self.assertEqual(content['status'], 'ok')
- return content
-
- self.addCleanup(kc.stop_channels)
- self.addCleanup(km.shutdown_kernel)
- N = 5
- for i in range(N):
- execute("start")
- time.sleep(1) # make sure subprocs stay up
- reply = execute('check')
- self.assertEqual(reply['user_expressions']['poll'], [None] * N)
-
- # start a job on the kernel to be interrupted
- kc.execute('sleep')
- time.sleep(1) # ensure sleep message has been handled before we interrupt
- km.interrupt_kernel()
- reply = kc.get_shell_msg(TIMEOUT)
- content = reply['content']
- self.assertEqual(content['status'], 'ok')
- self.assertEqual(content['user_expressions']['interrupted'], True)
- # wait up to 5s for subprocesses to handle signal
- for i in range(50):
- reply = execute('check')
- if reply['user_expressions']['poll'] != [-signal.SIGINT] * N:
- time.sleep(0.1)
- else:
- break
- # verify that subprocesses were interrupted
- self.assertEqual(reply['user_expressions']['poll'], [-signal.SIGINT] * N)
- def test_start_new_kernel(self):
- self._install_test_kernel()
- km, kc = start_new_kernel(kernel_name='signaltest')
- self.addCleanup(kc.stop_channels)
- self.addCleanup(km.shutdown_kernel)
- self.assertTrue(km.is_alive())
- self.assertTrue(kc.is_alive())
- @pytest.mark.parallel
- class TestParallel:
- @pytest.fixture(autouse=True)
- def env(self):
- env_patch = test_env()
- env_patch.start()
- yield
- env_patch.stop()
- @pytest.fixture(params=['tcp', 'ipc'])
- def transport(self, request):
- return request.param
- @pytest.fixture
- def config(self, transport):
- c = Config()
- c.transport = transport
- if transport == 'ipc':
- c.ip = 'test'
- return c
- def _install_test_kernel(self):
- kernel_dir = pjoin(paths.jupyter_data_dir(), 'kernels', 'signaltest')
- os.makedirs(kernel_dir)
- with open(pjoin(kernel_dir, 'kernel.json'), 'w') as f:
- f.write(json.dumps({
- 'argv': [sys.executable,
- '-m', 'jupyter_client.tests.signalkernel',
- '-f', '{connection_file}'],
- 'display_name': "Signal Test Kernel",
- }))
- def test_start_sequence_kernels(self, config):
- """Ensure that a sequence of kernel startups doesn't break anything."""
- self._install_test_kernel()
- self._run_signaltest_lifecycle(config)
- self._run_signaltest_lifecycle(config)
- self._run_signaltest_lifecycle(config)
- def test_start_parallel_thread_kernels(self, config):
- self._install_test_kernel()
- self._run_signaltest_lifecycle(config)
- thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
- thread2 = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
- try:
- thread.start()
- thread2.start()
- finally:
- thread.join()
- thread2.join()
- def test_start_parallel_process_kernels(self, config):
- self._install_test_kernel()
- self._run_signaltest_lifecycle(config)
- thread = threading.Thread(target=self._run_signaltest_lifecycle, args=(config,))
- proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
- try:
- thread.start()
- proc.start()
- finally:
- thread.join()
- proc.join()
- assert proc.exitcode == 0
- def test_start_sequence_process_kernels(self, config):
- self._install_test_kernel()
- self._run_signaltest_lifecycle(config)
- proc = mp.Process(target=self._run_signaltest_lifecycle, args=(config,))
- try:
- proc.start()
- finally:
- proc.join()
- assert proc.exitcode == 0
-
- def _prepare_kernel(self, km, startup_timeout=TIMEOUT, **kwargs):
- km.start_kernel(**kwargs)
- kc = km.client()
- kc.start_channels()
- try:
- kc.wait_for_ready(timeout=startup_timeout)
- except RuntimeError:
- kc.stop_channels()
- km.shutdown_kernel()
- raise
- return kc
- def _run_signaltest_lifecycle(self, config=None):
- km = KernelManager(config=config, kernel_name='signaltest')
- kc = self._prepare_kernel(km, stdout=PIPE, stderr=PIPE)
- def execute(cmd):
- kc.execute(cmd)
- reply = kc.get_shell_msg(TIMEOUT)
- content = reply['content']
- assert content['status'] == 'ok'
- return content
- execute("start")
- assert km.is_alive()
- execute('check')
- assert km.is_alive()
- km.restart_kernel(now=True)
- assert km.is_alive()
- execute('check')
- km.shutdown_kernel()
|