test_embed_kernel.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. """test IPython.embed_kernel()"""
  2. # Copyright (c) IPython Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. import os
  5. import sys
  6. import time
  7. from contextlib import contextmanager
  8. from subprocess import Popen, PIPE
  9. from jupyter_client import BlockingKernelClient
  10. from jupyter_core import paths
  11. from ipython_genutils import py3compat
  12. from ipython_genutils.py3compat import unicode_type
  13. SETUP_TIMEOUT = 60
  14. TIMEOUT = 15
  15. @contextmanager
  16. def setup_kernel(cmd):
  17. """start an embedded kernel in a subprocess, and wait for it to be ready
  18. Returns
  19. -------
  20. kernel_manager: connected KernelManager instance
  21. """
  22. kernel = Popen([sys.executable, '-c', cmd], stdout=PIPE, stderr=PIPE)
  23. connection_file = os.path.join(
  24. paths.jupyter_runtime_dir(),
  25. 'kernel-%i.json' % kernel.pid,
  26. )
  27. # wait for connection file to exist, timeout after 5s
  28. tic = time.time()
  29. while not os.path.exists(connection_file) \
  30. and kernel.poll() is None \
  31. and time.time() < tic + SETUP_TIMEOUT:
  32. time.sleep(0.1)
  33. if kernel.poll() is not None:
  34. o,e = kernel.communicate()
  35. e = py3compat.cast_unicode(e)
  36. raise IOError("Kernel failed to start:\n%s" % e)
  37. if not os.path.exists(connection_file):
  38. if kernel.poll() is None:
  39. kernel.terminate()
  40. raise IOError("Connection file %r never arrived" % connection_file)
  41. client = BlockingKernelClient(connection_file=connection_file)
  42. client.load_connection_file()
  43. client.start_channels()
  44. client.wait_for_ready()
  45. try:
  46. yield client
  47. finally:
  48. client.stop_channels()
  49. kernel.terminate()
  50. def test_embed_kernel_basic():
  51. """IPython.embed_kernel() is basically functional"""
  52. cmd = '\n'.join([
  53. 'from IPython import embed_kernel',
  54. 'def go():',
  55. ' a=5',
  56. ' b="hi there"',
  57. ' embed_kernel()',
  58. 'go()',
  59. '',
  60. ])
  61. with setup_kernel(cmd) as client:
  62. # oinfo a (int)
  63. msg_id = client.inspect('a')
  64. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  65. content = msg['content']
  66. assert content['found']
  67. msg_id = client.execute("c=a*2")
  68. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  69. content = msg['content']
  70. assert content['status'] == u'ok'
  71. # oinfo c (should be 10)
  72. msg_id = client.inspect('c')
  73. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  74. content = msg['content']
  75. assert content['found']
  76. text = content['data']['text/plain']
  77. assert '10' in text
  78. def test_embed_kernel_namespace():
  79. """IPython.embed_kernel() inherits calling namespace"""
  80. cmd = '\n'.join([
  81. 'from IPython import embed_kernel',
  82. 'def go():',
  83. ' a=5',
  84. ' b="hi there"',
  85. ' embed_kernel()',
  86. 'go()',
  87. '',
  88. ])
  89. with setup_kernel(cmd) as client:
  90. # oinfo a (int)
  91. msg_id = client.inspect('a')
  92. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  93. content = msg['content']
  94. assert content['found']
  95. text = content['data']['text/plain']
  96. assert u'5' in text
  97. # oinfo b (str)
  98. msg_id = client.inspect('b')
  99. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  100. content = msg['content']
  101. assert content['found']
  102. text = content['data']['text/plain']
  103. assert u'hi there' in text
  104. # oinfo c (undefined)
  105. msg_id = client.inspect('c')
  106. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  107. content = msg['content']
  108. assert not content['found']
  109. def test_embed_kernel_reentrant():
  110. """IPython.embed_kernel() can be called multiple times"""
  111. cmd = '\n'.join([
  112. 'from IPython import embed_kernel',
  113. 'count = 0',
  114. 'def go():',
  115. ' global count',
  116. ' embed_kernel()',
  117. ' count = count + 1',
  118. '',
  119. 'while True:'
  120. ' go()',
  121. '',
  122. ])
  123. with setup_kernel(cmd) as client:
  124. for i in range(5):
  125. msg_id = client.inspect('count')
  126. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  127. content = msg['content']
  128. assert content['found']
  129. text = content['data']['text/plain']
  130. assert unicode_type(i) in text
  131. # exit from embed_kernel
  132. client.execute("get_ipython().exit_now = True")
  133. msg = client.get_shell_msg(block=True, timeout=TIMEOUT)
  134. time.sleep(0.2)