123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158 |
- """test IPython.embed_kernel()"""
- # Copyright (c) IPython Development Team.
- # Distributed under the terms of the Modified BSD License.
- import os
- import sys
- import time
- from contextlib import contextmanager
- from subprocess import Popen, PIPE
- from jupyter_client import BlockingKernelClient
- from jupyter_core import paths
- from ipython_genutils import py3compat
- from ipython_genutils.py3compat import unicode_type
- SETUP_TIMEOUT = 60
- TIMEOUT = 15
- @contextmanager
- def setup_kernel(cmd):
- """start an embedded kernel in a subprocess, and wait for it to be ready
- Returns
- -------
- kernel_manager: connected KernelManager instance
- """
- kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE)
- connection_file = os.path.join(
- paths.jupyter_runtime_dir(),
- 'kernel-%i.json' % kernel.pid,
- )
- # wait for connection file to exist, timeout after 5s
- tic = time.time()
- while not os.path.exists(connection_file) \
- and kernel.poll() is None \
- and time.time() < tic + SETUP_TIMEOUT:
- time.sleep(0.1)
- if kernel.poll() is not None:
- o,e = kernel.communicate()
- e = py3compat.cast_unicode(e)
- raise IOError("Kernel failed to start:\n%s" % e)
- if not os.path.exists(connection_file):
- if kernel.poll() is None:
- kernel.terminate()
- raise IOError("Connection file %r never arrived" % connection_file)
- client = BlockingKernelClient(connection_file=connection_file)
- client.load_connection_file()
- client.start_channels()
- client.wait_for_ready()
- try:
- yield client
- finally:
- client.stop_channels()
- kernel.terminate()
- def test_embed_kernel_basic():
- """IPython.embed_kernel() is basically functional"""
- cmd = '\n'.join([
- 'from IPython import embed_kernel',
- 'def go():',
- ' a=5',
- ' b="hi there"',
- ' embed_kernel()',
- 'go()',
- '',
- ])
- with setup_kernel(cmd) as client:
- # oinfo a (int)
- msg_id = client.inspect('a')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['found']
- msg_id = client.execute("c=a*2")
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['status'] == u'ok'
- # oinfo c (should be 10)
- msg_id = client.inspect('c')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['found']
- text = content['data']['text/plain']
- assert '10' in text
- def test_embed_kernel_namespace():
- """IPython.embed_kernel() inherits calling namespace"""
- cmd = '\n'.join([
- 'from IPython import embed_kernel',
- 'def go():',
- ' a=5',
- ' b="hi there"',
- ' embed_kernel()',
- 'go()',
- '',
- ])
- with setup_kernel(cmd) as client:
- # oinfo a (int)
- msg_id = client.inspect('a')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['found']
- text = content['data']['text/plain']
- assert u'5' in text
- # oinfo b (str)
- msg_id = client.inspect('b')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['found']
- text = content['data']['text/plain']
- assert u'hi there' in text
- # oinfo c (undefined)
- msg_id = client.inspect('c')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert not content['found']
- def test_embed_kernel_reentrant():
- """IPython.embed_kernel() can be called multiple times"""
- cmd = '\n'.join([
- 'from IPython import embed_kernel',
- 'count = 0',
- 'def go():',
- ' global count',
- ' embed_kernel()',
- ' count = count + 1',
- '',
- 'while True:'
- ' go()',
- '',
- ])
- with setup_kernel(cmd) as client:
- for i in range(5):
- msg_id = client.inspect('count')
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- content = msg['content']
- assert content['found']
- text = content['data']['text/plain']
- assert unicode_type(i) in text
- # exit from embed_kernel
- client.execute("get_ipython().exit_now = True")
- msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
- time.sleep(0.2)
|