client.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348
  1. """Implements a fully blocking kernel client.
  2. Useful for test suites and blocking terminal interfaces.
  3. """
  4. # Copyright (c) Jupyter Development Team.
  5. # Distributed under the terms of the Modified BSD License.
  6. from __future__ import print_function
  7. from functools import partial
  8. from getpass import getpass
  9. try:
  10. from queue import Empty # Python 3
  11. except ImportError:
  12. from Queue import Empty # Python 2
  13. import sys
  14. import time
  15. import zmq
  16. from traitlets import Type
  17. from jupyter_client.channels import HBChannel
  18. from jupyter_client.client import KernelClient
  19. from .channels import ZMQSocketChannel
  20. try:
  21. monotonic = time.monotonic
  22. except AttributeError:
  23. # py2
  24. monotonic = time.time # close enough
  25. try:
  26. TimeoutError
  27. except NameError:
  28. # py2
  29. TimeoutError = RuntimeError
  30. def reqrep(meth):
  31. def wrapped(self, *args, **kwargs):
  32. reply = kwargs.pop('reply', False)
  33. timeout = kwargs.pop('timeout', None)
  34. msg_id = meth(self, *args, **kwargs)
  35. if not reply:
  36. return msg_id
  37. return self._recv_reply(msg_id, timeout=timeout)
  38. if not meth.__doc__:
  39. # python -OO removes docstrings,
  40. # so don't bother building the wrapped docstring
  41. return wrapped
  42. basedoc, _ = meth.__doc__.split('Returns\n', 1)
  43. parts = [basedoc.strip()]
  44. if 'Parameters' not in basedoc:
  45. parts.append("""
  46. Parameters
  47. ----------
  48. """)
  49. parts.append("""
  50. reply: bool (default: False)
  51. Whether to wait for and return reply
  52. timeout: float or None (default: None)
  53. Timeout to use when waiting for a reply
  54. Returns
  55. -------
  56. msg_id: str
  57. The msg_id of the request sent, if reply=False (default)
  58. reply: dict
  59. The reply message for this request, if reply=True
  60. """)
  61. wrapped.__doc__ = '\n'.join(parts)
  62. return wrapped
  63. class BlockingKernelClient(KernelClient):
  64. """A KernelClient with blocking APIs
  65. ``get_[channel]_msg()`` methods wait for and return messages on channels,
  66. raising :exc:`queue.Empty` if no message arrives within ``timeout`` seconds.
  67. """
  68. def wait_for_ready(self, timeout=None):
  69. """Waits for a response when a client is blocked
  70. - Sets future time for timeout
  71. - Blocks on shell channel until a message is received
  72. - Exit if the kernel has died
  73. - If client times out before receiving a message from the kernel, send RuntimeError
  74. - Flush the IOPub channel
  75. """
  76. if timeout is None:
  77. abs_timeout = float('inf')
  78. else:
  79. abs_timeout = time.time() + timeout
  80. from ..manager import KernelManager
  81. if not isinstance(self.parent, KernelManager):
  82. # This Client was not created by a KernelManager,
  83. # so wait for kernel to become responsive to heartbeats
  84. # before checking for kernel_info reply
  85. while not self.is_alive():
  86. if time.time() > abs_timeout:
  87. raise RuntimeError("Kernel didn't respond to heartbeats in %d seconds and timed out" % timeout)
  88. time.sleep(0.2)
  89. # Wait for kernel info reply on shell channel
  90. while True:
  91. try:
  92. msg = self.shell_channel.get_msg(block=True, timeout=1)
  93. except Empty:
  94. pass
  95. else:
  96. if msg['msg_type'] == 'kernel_info_reply':
  97. self._handle_kernel_info_reply(msg)
  98. break
  99. if not self.is_alive():
  100. raise RuntimeError('Kernel died before replying to kernel_info')
  101. # Check if current time is ready check time plus timeout
  102. if time.time() > abs_timeout:
  103. raise RuntimeError("Kernel didn't respond in %d seconds" % timeout)
  104. # Flush IOPub channel
  105. while True:
  106. try:
  107. msg = self.iopub_channel.get_msg(block=True, timeout=0.2)
  108. except Empty:
  109. break
  110. # The classes to use for the various channels
  111. shell_channel_class = Type(ZMQSocketChannel)
  112. iopub_channel_class = Type(ZMQSocketChannel)
  113. stdin_channel_class = Type(ZMQSocketChannel)
  114. hb_channel_class = Type(HBChannel)
  115. def _recv_reply(self, msg_id, timeout=None):
  116. """Receive and return the reply for a given request"""
  117. if timeout is not None:
  118. deadline = monotonic() + timeout
  119. while True:
  120. if timeout is not None:
  121. timeout = max(0, deadline - monotonic())
  122. try:
  123. reply = self.get_shell_msg(timeout=timeout)
  124. except Empty:
  125. raise TimeoutError("Timeout waiting for reply")
  126. if reply['parent_header'].get('msg_id') != msg_id:
  127. # not my reply, someone may have forgotten to retrieve theirs
  128. continue
  129. return reply
  130. execute = reqrep(KernelClient.execute)
  131. history = reqrep(KernelClient.history)
  132. complete = reqrep(KernelClient.complete)
  133. inspect = reqrep(KernelClient.inspect)
  134. kernel_info = reqrep(KernelClient.kernel_info)
  135. comm_info = reqrep(KernelClient.comm_info)
  136. shutdown = reqrep(KernelClient.shutdown)
  137. def _stdin_hook_default(self, msg):
  138. """Handle an input request"""
  139. content = msg['content']
  140. if content.get('password', False):
  141. prompt = getpass
  142. elif sys.version_info < (3,):
  143. prompt = raw_input
  144. else:
  145. prompt = input
  146. try:
  147. raw_data = prompt(content["prompt"])
  148. except EOFError:
  149. # turn EOFError into EOF character
  150. raw_data = '\x04'
  151. except KeyboardInterrupt:
  152. sys.stdout.write('\n')
  153. return
  154. # only send stdin reply if there *was not* another request
  155. # or execution finished while we were reading.
  156. if not (self.stdin_channel.msg_ready() or self.shell_channel.msg_ready()):
  157. self.input(raw_data)
  158. def _output_hook_default(self, msg):
  159. """Default hook for redisplaying plain-text output"""
  160. msg_type = msg['header']['msg_type']
  161. content = msg['content']
  162. if msg_type == 'stream':
  163. stream = getattr(sys, content['name'])
  164. stream.write(content['text'])
  165. elif msg_type in ('display_data', 'execute_result'):
  166. sys.stdout.write(content['data'].get('text/plain', ''))
  167. elif msg_type == 'error':
  168. print('\n'.join(content['traceback']), file=sys.stderr)
  169. def _output_hook_kernel(self, session, socket, parent_header, msg):
  170. """Output hook when running inside an IPython kernel
  171. adds rich output support.
  172. """
  173. msg_type = msg['header']['msg_type']
  174. if msg_type in ('display_data', 'execute_result', 'error'):
  175. session.send(socket, msg_type, msg['content'], parent=parent_header)
  176. else:
  177. self._output_hook_default(msg)
  178. def execute_interactive(self, code, silent=False, store_history=True,
  179. user_expressions=None, allow_stdin=None, stop_on_error=True,
  180. timeout=None, output_hook=None, stdin_hook=None,
  181. ):
  182. """Execute code in the kernel interactively
  183. Output will be redisplayed, and stdin prompts will be relayed as well.
  184. If an IPython kernel is detected, rich output will be displayed.
  185. You can pass a custom output_hook callable that will be called
  186. with every IOPub message that is produced instead of the default redisplay.
  187. .. versionadded:: 5.0
  188. Parameters
  189. ----------
  190. code : str
  191. A string of code in the kernel's language.
  192. silent : bool, optional (default False)
  193. If set, the kernel will execute the code as quietly possible, and
  194. will force store_history to be False.
  195. store_history : bool, optional (default True)
  196. If set, the kernel will store command history. This is forced
  197. to be False if silent is True.
  198. user_expressions : dict, optional
  199. A dict mapping names to expressions to be evaluated in the user's
  200. dict. The expression values are returned as strings formatted using
  201. :func:`repr`.
  202. allow_stdin : bool, optional (default self.allow_stdin)
  203. Flag for whether the kernel can send stdin requests to frontends.
  204. Some frontends (e.g. the Notebook) do not support stdin requests.
  205. If raw_input is called from code executed from such a frontend, a
  206. StdinNotImplementedError will be raised.
  207. stop_on_error: bool, optional (default True)
  208. Flag whether to abort the execution queue, if an exception is encountered.
  209. timeout: float or None (default: None)
  210. Timeout to use when waiting for a reply
  211. output_hook: callable(msg)
  212. Function to be called with output messages.
  213. If not specified, output will be redisplayed.
  214. stdin_hook: callable(msg)
  215. Function to be called with stdin_request messages.
  216. If not specified, input/getpass will be called.
  217. Returns
  218. -------
  219. reply: dict
  220. The reply message for this request
  221. """
  222. if not self.iopub_channel.is_alive():
  223. raise RuntimeError("IOPub channel must be running to receive output")
  224. if allow_stdin is None:
  225. allow_stdin = self.allow_stdin
  226. if allow_stdin and not self.stdin_channel.is_alive():
  227. raise RuntimeError("stdin channel must be running to allow input")
  228. msg_id = self.execute(code,
  229. silent=silent,
  230. store_history=store_history,
  231. user_expressions=user_expressions,
  232. allow_stdin=allow_stdin,
  233. stop_on_error=stop_on_error,
  234. )
  235. if stdin_hook is None:
  236. stdin_hook = self._stdin_hook_default
  237. if output_hook is None:
  238. # detect IPython kernel
  239. if 'IPython' in sys.modules:
  240. from IPython import get_ipython
  241. ip = get_ipython()
  242. in_kernel = getattr(ip, 'kernel', False)
  243. if in_kernel:
  244. output_hook = partial(
  245. self._output_hook_kernel,
  246. ip.display_pub.session,
  247. ip.display_pub.pub_socket,
  248. ip.display_pub.parent_header,
  249. )
  250. if output_hook is None:
  251. # default: redisplay plain-text outputs
  252. output_hook = self._output_hook_default
  253. # set deadline based on timeout
  254. if timeout is not None:
  255. deadline = monotonic() + timeout
  256. else:
  257. timeout_ms = None
  258. poller = zmq.Poller()
  259. iopub_socket = self.iopub_channel.socket
  260. poller.register(iopub_socket, zmq.POLLIN)
  261. if allow_stdin:
  262. stdin_socket = self.stdin_channel.socket
  263. poller.register(stdin_socket, zmq.POLLIN)
  264. else:
  265. stdin_socket = None
  266. # wait for output and redisplay it
  267. while True:
  268. if timeout is not None:
  269. timeout = max(0, deadline - monotonic())
  270. timeout_ms = 1e3 * timeout
  271. events = dict(poller.poll(timeout_ms))
  272. if not events:
  273. raise TimeoutError("Timeout waiting for output")
  274. if stdin_socket in events:
  275. req = self.stdin_channel.get_msg(timeout=0)
  276. stdin_hook(req)
  277. continue
  278. if iopub_socket not in events:
  279. continue
  280. msg = self.iopub_channel.get_msg(timeout=0)
  281. if msg['parent_header'].get('msg_id') != msg_id:
  282. # not from my request
  283. continue
  284. output_hook(msg)
  285. # stop on idle
  286. if msg['header']['msg_type'] == 'status' and \
  287. msg['content']['execution_state'] == 'idle':
  288. break
  289. # output is done, get the reply
  290. if timeout is not None:
  291. timeout = max(0, deadline - monotonic())
  292. return self._recv_reply(msg_id, timeout=timeout)