123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348 |
- """Implements a fully blocking kernel client.
- Useful for test suites and blocking terminal interfaces.
- """
- # Copyright (c) Jupyter Development Team.
- # Distributed under the terms of the Modified BSD License.
- from __future__ import print_function
- from functools import partial
- from getpass import getpass
- try:
- from queue import Empty # Python 3
- except ImportError:
- from Queue import Empty # Python 2
- import sys
- import time
- import zmq
- from traitlets import Type
- from jupyter_client.channels import HBChannel
- from jupyter_client.client import KernelClient
- from .channels import ZMQSocketChannel
- try:
- monotonic = time.monotonic
- except AttributeError:
- # py2
- monotonic = time.time # close enough
- try:
- TimeoutError
- except NameError:
- # py2
- TimeoutError = RuntimeError
- def reqrep(meth):
- def wrapped(self, *args, **kwargs):
- reply = kwargs.pop('reply', False)
- timeout = kwargs.pop('timeout', None)
- msg_id = meth(self, *args, **kwargs)
- if not reply:
- return msg_id
- return self._recv_reply(msg_id, timeout=timeout)
-
- if not meth.__doc__:
- # python -OO removes docstrings,
- # so don't bother building the wrapped docstring
- return wrapped
-
- basedoc, _ = meth.__doc__.split('Returns\n', 1)
- parts = [basedoc.strip()]
- if 'Parameters' not in basedoc:
- parts.append("""
- Parameters
- ----------
- """)
- parts.append("""
- reply: bool (default: False)
- Whether to wait for and return reply
- timeout: float or None (default: None)
- Timeout to use when waiting for a reply
- Returns
- -------
- msg_id: str
- The msg_id of the request sent, if reply=False (default)
- reply: dict
- The reply message for this request, if reply=True
- """)
- wrapped.__doc__ = '\n'.join(parts)
- return wrapped
- class BlockingKernelClient(KernelClient):
- """A KernelClient with blocking APIs
-
- ``get_[channel]_msg()`` methods wait for and return messages on channels,
- raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
- """
-
- def wait_for_ready(self, timeout=None):
- """Waits for a response when a client is blocked
-
- - Sets future time for timeout
- - Blocks on shell channel until a message is received
- - Exit if the kernel has died
- - If client times out before receiving a message from the kernel, send RuntimeError
- - Flush the IOPub channel
- """
- if timeout is None:
- abs_timeout = float('inf')
- else:
- abs_timeout = time.time() + timeout
- from ..manager import KernelManager
- if not isinstance(self.parent, KernelManager):
- # This Client was not created by a KernelManager,
- # so wait for kernel to become responsive to heartbeats
- # before checking for kernel_info reply
- while not self.is_alive():
- if time.time() > abs_timeout:
- raise RuntimeError("Kernel didn't respond to heartbeats in %d seconds and timed out" % timeout)
- time.sleep(0.2)
- # Wait for kernel info reply on shell channel
- while True:
- try:
- msg = self.shell_channel.get_msg(block=True, timeout=1)
- except Empty:
- pass
- else:
- if msg['msg_type'] == 'kernel_info_reply':
- self._handle_kernel_info_reply(msg)
- break
- if not self.is_alive():
- raise RuntimeError('Kernel died before replying to kernel_info')
- # Check if current time is ready check time plus timeout
- if time.time() > abs_timeout:
- raise RuntimeError("Kernel didn't respond in %d seconds" % timeout)
- # Flush IOPub channel
- while True:
- try:
- msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
- except Empty:
- break
- # The classes to use for the various channels
- shell_channel_class = Type(ZMQSocketChannel)
- iopub_channel_class = Type(ZMQSocketChannel)
- stdin_channel_class = Type(ZMQSocketChannel)
- hb_channel_class = Type(HBChannel)
- def _recv_reply(self, msg_id, timeout=None):
- """Receive and return the reply for a given request"""
- if timeout is not None:
- deadline = monotonic() + timeout
- while True:
- if timeout is not None:
- timeout = max(0, deadline - monotonic())
- try:
- reply = self.get_shell_msg(timeout=timeout)
- except Empty:
- raise TimeoutError("Timeout waiting for reply")
- if reply['parent_header'].get('msg_id') != msg_id:
- # not my reply, someone may have forgotten to retrieve theirs
- continue
- return reply
- execute = reqrep(KernelClient.execute)
- history = reqrep(KernelClient.history)
- complete = reqrep(KernelClient.complete)
- inspect = reqrep(KernelClient.inspect)
- kernel_info = reqrep(KernelClient.kernel_info)
- comm_info = reqrep(KernelClient.comm_info)
- shutdown = reqrep(KernelClient.shutdown)
- def _stdin_hook_default(self, msg):
- """Handle an input request"""
- content = msg['content']
- if content.get('password', False):
- prompt = getpass
- elif sys.version_info < (3,):
- prompt = raw_input
- else:
- prompt = input
- try:
- raw_data = prompt(content["prompt"])
- except EOFError:
- # turn EOFError into EOF character
- raw_data = '\x04'
- except KeyboardInterrupt:
- sys.stdout.write('\n')
- return
- # only send stdin reply if there *was not* another request
- # or execution finished while we were reading.
- if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()):
- self.input(raw_data)
- def _output_hook_default(self, msg):
- """Default hook for redisplaying plain-text output"""
- msg_type = msg['header']['msg_type']
- content = msg['content']
- if msg_type == 'stream':
- stream = getattr(sys, content['name'])
- stream.write(content['text'])
- elif msg_type in ('display_data', 'execute_result'):
- sys.stdout.write(content['data'].get('text/plain', ''))
- elif msg_type == 'error':
- print('\n'.join(content['traceback']), file=sys.stderr)
- def _output_hook_kernel(self, session, socket, parent_header, msg):
- """Output hook when running inside an IPython kernel
- adds rich output support.
- """
- msg_type = msg['header']['msg_type']
- if msg_type in ('display_data', 'execute_result', 'error'):
- session.send(socket, msg_type, msg['content'], parent=parent_header)
- else:
- self._output_hook_default(msg)
- def execute_interactive(self, code, silent=False, store_history=True,
- user_expressions=None, allow_stdin=None, stop_on_error=True,
- timeout=None, output_hook=None, stdin_hook=None,
- ):
- """Execute code in the kernel interactively
- Output will be redisplayed, and stdin prompts will be relayed as well.
- If an IPython kernel is detected, rich output will be displayed.
- You can pass a custom output_hook callable that will be called
- with every IOPub message that is produced instead of the default redisplay.
- .. versionadded:: 5.0
- Parameters
- ----------
- code : str
- A string of code in the kernel's language.
- silent : bool, optional (default False)
- If set, the kernel will execute the code as quietly possible, and
- will force store_history to be False.
- store_history : bool, optional (default True)
- If set, the kernel will store command history. This is forced
- to be False if silent is True.
- user_expressions : dict, optional
- A dict mapping names to expressions to be evaluated in the user's
- dict. The expression values are returned as strings formatted using
- :func:`repr`.
- allow_stdin : bool, optional (default self.allow_stdin)
- Flag for whether the kernel can send stdin requests to frontends.
- Some frontends (e.g. the Notebook) do not support stdin requests.
- If raw_input is called from code executed from such a frontend, a
- StdinNotImplementedError will be raised.
- stop_on_error: bool, optional (default True)
- Flag whether to abort the execution queue, if an exception is encountered.
- timeout: float or None (default: None)
- Timeout to use when waiting for a reply
- output_hook: callable(msg)
- Function to be called with output messages.
- If not specified, output will be redisplayed.
- stdin_hook: callable(msg)
- Function to be called with stdin_request messages.
- If not specified, input/getpass will be called.
- Returns
- -------
- reply: dict
- The reply message for this request
- """
- if not self.iopub_channel.is_alive():
- raise RuntimeError("IOPub channel must be running to receive output")
- if allow_stdin is None:
- allow_stdin = self.allow_stdin
- if allow_stdin and not self.stdin_channel.is_alive():
- raise RuntimeError("stdin channel must be running to allow input")
- msg_id = self.execute(code,
- silent=silent,
- store_history=store_history,
- user_expressions=user_expressions,
- allow_stdin=allow_stdin,
- stop_on_error=stop_on_error,
- )
- if stdin_hook is None:
- stdin_hook = self._stdin_hook_default
- if output_hook is None:
- # detect IPython kernel
- if 'IPython' in sys.modules:
- from IPython import get_ipython
- ip = get_ipython()
- in_kernel = getattr(ip, 'kernel', False)
- if in_kernel:
- output_hook = partial(
- self._output_hook_kernel,
- ip.display_pub.session,
- ip.display_pub.pub_socket,
- ip.display_pub.parent_header,
- )
- if output_hook is None:
- # default: redisplay plain-text outputs
- output_hook = self._output_hook_default
- # set deadline based on timeout
- if timeout is not None:
- deadline = monotonic() + timeout
- else:
- timeout_ms = None
- poller = zmq.Poller()
- iopub_socket = self.iopub_channel.socket
- poller.register(iopub_socket, zmq.POLLIN)
- if allow_stdin:
- stdin_socket = self.stdin_channel.socket
- poller.register(stdin_socket, zmq.POLLIN)
- else:
- stdin_socket = None
- # wait for output and redisplay it
- while True:
- if timeout is not None:
- timeout = max(0, deadline - monotonic())
- timeout_ms = 1e3 * timeout
- events = dict(poller.poll(timeout_ms))
- if not events:
- raise TimeoutError("Timeout waiting for output")
- if stdin_socket in events:
- req = self.stdin_channel.get_msg(timeout=0)
- stdin_hook(req)
- continue
- if iopub_socket not in events:
- continue
- msg = self.iopub_channel.get_msg(timeout=0)
- if msg['parent_header'].get('msg_id') != msg_id:
- # not from my request
- continue
- output_hook(msg)
- # stop on idle
- if msg['header']['msg_type'] == 'status' and \
- msg['content']['execution_state'] == 'idle':
- break
- # output is done, get the reply
- if timeout is not None:
- timeout = max(0, deadline - monotonic())
- return self._recv_reply(msg_id, timeout=timeout)
|