123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493 |
- """Tornado handlers for kernels.
- Preliminary documentation at https://github.com/ipython/ipython/wiki/IPEP-16%3A-Notebook-multi-directory-dashboard-and-URL-mapping#kernels-api
- """
- # Copyright (c) Jupyter Development Team.
- # Distributed under the terms of the Modified BSD License.
- import json
- import logging
- from textwrap import dedent
- from tornado import gen, web
- from tornado.concurrent import Future
- from tornado.ioloop import IOLoop
- from jupyter_client.jsonutil import date_default
- from ipython_genutils.py3compat import cast_unicode
- from notebook.utils import url_path_join, url_escape
- from ...base.handlers import APIHandler
- from ...base.zmqhandlers import AuthenticatedZMQStreamHandler, deserialize_binary_message
- from jupyter_client import protocol_version as client_protocol_version
- class MainKernelHandler(APIHandler):
- @web.authenticated
- @gen.coroutine
- def get(self):
- km = self.kernel_manager
- kernels = yield gen.maybe_future(km.list_kernels())
- self.finish(json.dumps(kernels, default=date_default))
- @web.authenticated
- @gen.coroutine
- def post(self):
- km = self.kernel_manager
- model = self.get_json_body()
- if model is None:
- model = {
- 'name': km.default_kernel_name
- }
- else:
- model.setdefault('name', km.default_kernel_name)
- kernel_id = yield gen.maybe_future(km.start_kernel(kernel_name=model['name']))
- model = km.kernel_model(kernel_id)
- location = url_path_join(self.base_url, 'api', 'kernels', url_escape(kernel_id))
- self.set_header('Location', location)
- self.set_status(201)
- self.finish(json.dumps(model, default=date_default))
- class KernelHandler(APIHandler):
- @web.authenticated
- def get(self, kernel_id):
- km = self.kernel_manager
- km._check_kernel_id(kernel_id)
- model = km.kernel_model(kernel_id)
- self.finish(json.dumps(model, default=date_default))
- @web.authenticated
- @gen.coroutine
- def delete(self, kernel_id):
- km = self.kernel_manager
- yield gen.maybe_future(km.shutdown_kernel(kernel_id))
- self.set_status(204)
- self.finish()
- class KernelActionHandler(APIHandler):
- @web.authenticated
- @gen.coroutine
- def post(self, kernel_id, action):
- km = self.kernel_manager
- if action == 'interrupt':
- km.interrupt_kernel(kernel_id)
- self.set_status(204)
- if action == 'restart':
- try:
- yield gen.maybe_future(km.restart_kernel(kernel_id))
- except Exception as e:
- self.log.error("Exception restarting kernel", exc_info=True)
- self.set_status(500)
- else:
- model = km.kernel_model(kernel_id)
- self.write(json.dumps(model, default=date_default))
- self.finish()
- class ZMQChannelsHandler(AuthenticatedZMQStreamHandler):
- '''There is one ZMQChannelsHandler per running kernel and it oversees all
- the sessions.
- '''
-
- # class-level registry of open sessions
- # allows checking for conflict on session-id,
- # which is used as a zmq identity and must be unique.
- _open_sessions = {}
- @property
- def kernel_info_timeout(self):
- km_default = self.kernel_manager.kernel_info_timeout
- return self.settings.get('kernel_info_timeout', km_default)
- @property
- def iopub_msg_rate_limit(self):
- return self.settings.get('iopub_msg_rate_limit', 0)
- @property
- def iopub_data_rate_limit(self):
- return self.settings.get('iopub_data_rate_limit', 0)
- @property
- def rate_limit_window(self):
- return self.settings.get('rate_limit_window', 1.0)
- def __repr__(self):
- return "%s(%s)" % (self.__class__.__name__, getattr(self, 'kernel_id', 'uninitialized'))
- def create_stream(self):
- km = self.kernel_manager
- identity = self.session.bsession
- for channel in ('shell', 'iopub', 'stdin'):
- meth = getattr(km, 'connect_' + channel)
- self.channels[channel] = stream = meth(self.kernel_id, identity=identity)
- stream.channel = channel
-
- def request_kernel_info(self):
- """send a request for kernel_info"""
- km = self.kernel_manager
- kernel = km.get_kernel(self.kernel_id)
- try:
- # check for previous request
- future = kernel._kernel_info_future
- except AttributeError:
- self.log.debug("Requesting kernel info from %s", self.kernel_id)
- # Create a kernel_info channel to query the kernel protocol version.
- # This channel will be closed after the kernel_info reply is received.
- if self.kernel_info_channel is None:
- self.kernel_info_channel = km.connect_shell(self.kernel_id)
- self.kernel_info_channel.on_recv(self._handle_kernel_info_reply)
- self.session.send(self.kernel_info_channel, "kernel_info_request")
- # store the future on the kernel, so only one request is sent
- kernel._kernel_info_future = self._kernel_info_future
- else:
- if not future.done():
- self.log.debug("Waiting for pending kernel_info request")
- future.add_done_callback(lambda f: self._finish_kernel_info(f.result()))
- return self._kernel_info_future
-
- def _handle_kernel_info_reply(self, msg):
- """process the kernel_info_reply
-
- enabling msg spec adaptation, if necessary
- """
- idents,msg = self.session.feed_identities(msg)
- try:
- msg = self.session.deserialize(msg)
- except:
- self.log.error("Bad kernel_info reply", exc_info=True)
- self._kernel_info_future.set_result({})
- return
- else:
- info = msg['content']
- self.log.debug("Received kernel info: %s", info)
- if msg['msg_type'] != 'kernel_info_reply' or 'protocol_version' not in info:
- self.log.error("Kernel info request failed, assuming current %s", info)
- info = {}
- self._finish_kernel_info(info)
-
- # close the kernel_info channel, we don't need it anymore
- if self.kernel_info_channel:
- self.kernel_info_channel.close()
- self.kernel_info_channel = None
-
- def _finish_kernel_info(self, info):
- """Finish handling kernel_info reply
-
- Set up protocol adaptation, if needed,
- and signal that connection can continue.
- """
- protocol_version = info.get('protocol_version', client_protocol_version)
- if protocol_version != client_protocol_version:
- self.session.adapt_version = int(protocol_version.split('.')[0])
- self.log.info("Adapting to protocol v%s for kernel %s", protocol_version, self.kernel_id)
- if not self._kernel_info_future.done():
- self._kernel_info_future.set_result(info)
-
- def initialize(self):
- super(ZMQChannelsHandler, self).initialize()
- self.zmq_stream = None
- self.channels = {}
- self.kernel_id = None
- self.kernel_info_channel = None
- self._kernel_info_future = Future()
- self._close_future = Future()
- self.session_key = ''
- # Rate limiting code
- self._iopub_window_msg_count = 0
- self._iopub_window_byte_count = 0
- self._iopub_msgs_exceeded = False
- self._iopub_data_exceeded = False
- # Queue of (time stamp, byte count)
- # Allows you to specify that the byte count should be lowered
- # by a delta amount at some point in the future.
- self._iopub_window_byte_queue = []
- @gen.coroutine
- def pre_get(self):
- # authenticate first
- super(ZMQChannelsHandler, self).pre_get()
- # check session collision:
- yield self._register_session()
- # then request kernel info, waiting up to a certain time before giving up.
- # We don't want to wait forever, because browsers don't take it well when
- # servers never respond to websocket connection requests.
- kernel = self.kernel_manager.get_kernel(self.kernel_id)
- self.session.key = kernel.session.key
- future = self.request_kernel_info()
-
- def give_up():
- """Don't wait forever for the kernel to reply"""
- if future.done():
- return
- self.log.warning("Timeout waiting for kernel_info reply from %s", self.kernel_id)
- future.set_result({})
- loop = IOLoop.current()
- loop.add_timeout(loop.time() + self.kernel_info_timeout, give_up)
- # actually wait for it
- yield future
-
- @gen.coroutine
- def get(self, kernel_id):
- self.kernel_id = cast_unicode(kernel_id, 'ascii')
- yield super(ZMQChannelsHandler, self).get(kernel_id=kernel_id)
-
- @gen.coroutine
- def _register_session(self):
- """Ensure we aren't creating a duplicate session.
-
- If a previous identical session is still open, close it to avoid collisions.
- This is likely due to a client reconnecting from a lost network connection,
- where the socket on our side has not been cleaned up yet.
- """
- self.session_key = '%s:%s' % (self.kernel_id, self.session.session)
- stale_handler = self._open_sessions.get(self.session_key)
- if stale_handler:
- self.log.warning("Replacing stale connection: %s", self.session_key)
- yield stale_handler.close()
- self._open_sessions[self.session_key] = self
- def open(self, kernel_id):
- super(ZMQChannelsHandler, self).open()
- km = self.kernel_manager
- km.notify_connect(kernel_id)
- # on new connections, flush the message buffer
- buffer_info = km.get_buffer(kernel_id, self.session_key)
- if buffer_info and buffer_info['session_key'] == self.session_key:
- self.log.info("Restoring connection for %s", self.session_key)
- self.channels = buffer_info['channels']
- replay_buffer = buffer_info['buffer']
- if replay_buffer:
- self.log.info("Replaying %s buffered messages", len(replay_buffer))
- for channel, msg_list in replay_buffer:
- stream = self.channels[channel]
- self._on_zmq_reply(stream, msg_list)
- else:
- try:
- self.create_stream()
- except web.HTTPError as e:
- self.log.error("Error opening stream: %s", e)
- # WebSockets don't response to traditional error codes so we
- # close the connection.
- for channel, stream in self.channels.items():
- if not stream.closed():
- stream.close()
- self.close()
- return
- km.add_restart_callback(self.kernel_id, self.on_kernel_restarted)
- km.add_restart_callback(self.kernel_id, self.on_restart_failed, 'dead')
- for channel, stream in self.channels.items():
- stream.on_recv_stream(self._on_zmq_reply)
- def on_message(self, msg):
- if not self.channels:
- # already closed, ignore the message
- self.log.debug("Received message on closed websocket %r", msg)
- return
- if isinstance(msg, bytes):
- msg = deserialize_binary_message(msg)
- else:
- msg = json.loads(msg)
- channel = msg.pop('channel', None)
- if channel is None:
- self.log.warning("No channel specified, assuming shell: %s", msg)
- channel = 'shell'
- if channel not in self.channels:
- self.log.warning("No such channel: %r", channel)
- return
- stream = self.channels[channel]
- self.session.send(stream, msg)
- def _on_zmq_reply(self, stream, msg_list):
- idents, fed_msg_list = self.session.feed_identities(msg_list)
- msg = self.session.deserialize(fed_msg_list)
- parent = msg['parent_header']
- def write_stderr(error_message):
- self.log.warning(error_message)
- msg = self.session.msg("stream",
- content={"text": error_message + '\n', "name": "stderr"},
- parent=parent
- )
- msg['channel'] = 'iopub'
- self.write_message(json.dumps(msg, default=date_default))
- channel = getattr(stream, 'channel', None)
- msg_type = msg['header']['msg_type']
- if channel == 'iopub' and msg_type == 'status' and msg['content'].get('execution_state') == 'idle':
- # reset rate limit counter on status=idle,
- # to avoid 'Run All' hitting limits prematurely.
- self._iopub_window_byte_queue = []
- self._iopub_window_msg_count = 0
- self._iopub_window_byte_count = 0
- self._iopub_msgs_exceeded = False
- self._iopub_data_exceeded = False
- if channel == 'iopub' and msg_type not in {'status', 'comm_open', 'execute_input'}:
-
- # Remove the counts queued for removal.
- now = IOLoop.current().time()
- while len(self._iopub_window_byte_queue) > 0:
- queued = self._iopub_window_byte_queue[0]
- if (now >= queued[0]):
- self._iopub_window_byte_count -= queued[1]
- self._iopub_window_msg_count -= 1
- del self._iopub_window_byte_queue[0]
- else:
- # This part of the queue hasn't be reached yet, so we can
- # abort the loop.
- break
- # Increment the bytes and message count
- self._iopub_window_msg_count += 1
- if msg_type == 'stream':
- byte_count = sum([len(x) for x in msg_list])
- else:
- byte_count = 0
- self._iopub_window_byte_count += byte_count
-
- # Queue a removal of the byte and message count for a time in the
- # future, when we are no longer interested in it.
- self._iopub_window_byte_queue.append((now + self.rate_limit_window, byte_count))
-
- # Check the limits, set the limit flags, and reset the
- # message and data counts.
- msg_rate = float(self._iopub_window_msg_count) / self.rate_limit_window
- data_rate = float(self._iopub_window_byte_count) / self.rate_limit_window
-
- # Check the msg rate
- if self.iopub_msg_rate_limit > 0 and msg_rate > self.iopub_msg_rate_limit:
- if not self._iopub_msgs_exceeded:
- self._iopub_msgs_exceeded = True
- write_stderr(dedent("""\
- IOPub message rate exceeded.
- The notebook server will temporarily stop sending output
- to the client in order to avoid crashing it.
- To change this limit, set the config variable
- `--NotebookApp.iopub_msg_rate_limit`.
-
- Current values:
- NotebookApp.iopub_msg_rate_limit={} (msgs/sec)
- NotebookApp.rate_limit_window={} (secs)
- """.format(self.iopub_msg_rate_limit, self.rate_limit_window)))
- else:
- # resume once we've got some headroom below the limit
- if self._iopub_msgs_exceeded and msg_rate < (0.8 * self.iopub_msg_rate_limit):
- self._iopub_msgs_exceeded = False
- if not self._iopub_data_exceeded:
- self.log.warning("iopub messages resumed")
- # Check the data rate
- if self.iopub_data_rate_limit > 0 and data_rate > self.iopub_data_rate_limit:
- if not self._iopub_data_exceeded:
- self._iopub_data_exceeded = True
- write_stderr(dedent("""\
- IOPub data rate exceeded.
- The notebook server will temporarily stop sending output
- to the client in order to avoid crashing it.
- To change this limit, set the config variable
- `--NotebookApp.iopub_data_rate_limit`.
-
- Current values:
- NotebookApp.iopub_data_rate_limit={} (bytes/sec)
- NotebookApp.rate_limit_window={} (secs)
- """.format(self.iopub_data_rate_limit, self.rate_limit_window)))
- else:
- # resume once we've got some headroom below the limit
- if self._iopub_data_exceeded and data_rate < (0.8 * self.iopub_data_rate_limit):
- self._iopub_data_exceeded = False
- if not self._iopub_msgs_exceeded:
- self.log.warning("iopub messages resumed")
-
- # If either of the limit flags are set, do not send the message.
- if self._iopub_msgs_exceeded or self._iopub_data_exceeded:
- # we didn't send it, remove the current message from the calculus
- self._iopub_window_msg_count -= 1
- self._iopub_window_byte_count -= byte_count
- self._iopub_window_byte_queue.pop(-1)
- return
- super(ZMQChannelsHandler, self)._on_zmq_reply(stream, msg)
- def close(self):
- super(ZMQChannelsHandler, self).close()
- return self._close_future
- def on_close(self):
- self.log.debug("Websocket closed %s", self.session_key)
- # unregister myself as an open session (only if it's really me)
- if self._open_sessions.get(self.session_key) is self:
- self._open_sessions.pop(self.session_key)
- km = self.kernel_manager
- if self.kernel_id in km:
- km.notify_disconnect(self.kernel_id)
- km.remove_restart_callback(
- self.kernel_id, self.on_kernel_restarted,
- )
- km.remove_restart_callback(
- self.kernel_id, self.on_restart_failed, 'dead',
- )
- # start buffering instead of closing if this was the last connection
- if km._kernel_connections[self.kernel_id] == 0:
- km.start_buffering(self.kernel_id, self.session_key, self.channels)
- self._close_future.set_result(None)
- return
- # This method can be called twice, once by self.kernel_died and once
- # from the WebSocket close event. If the WebSocket connection is
- # closed before the ZMQ streams are setup, they could be None.
- for channel, stream in self.channels.items():
- if stream is not None and not stream.closed():
- stream.on_recv(None)
- stream.close()
- self.channels = {}
- self._close_future.set_result(None)
- def _send_status_message(self, status):
- iopub = self.channels.get('iopub', None)
- if iopub and not iopub.closed():
- # flush IOPub before sending a restarting/dead status message
- # ensures proper ordering on the IOPub channel
- # that all messages from the stopped kernel have been delivered
- iopub.flush()
- msg = self.session.msg("status",
- {'execution_state': status}
- )
- msg['channel'] = 'iopub'
- self.write_message(json.dumps(msg, default=date_default))
- def on_kernel_restarted(self):
- logging.warn("kernel %s restarted", self.kernel_id)
- self._send_status_message('restarting')
- def on_restart_failed(self):
- logging.error("kernel %s restarted failed!", self.kernel_id)
- self._send_status_message('dead')
- #-----------------------------------------------------------------------------
- # URL to handler mappings
- #-----------------------------------------------------------------------------
- _kernel_id_regex = r"(?P<kernel_id>\w+-\w+-\w+-\w+-\w+)"
- _kernel_action_regex = r"(?P<action>restart|interrupt)"
- default_handlers = [
- (r"/api/kernels", MainKernelHandler),
- (r"/api/kernels/%s" % _kernel_id_regex, KernelHandler),
- (r"/api/kernels/%s/%s" % (_kernel_id_regex, _kernel_action_regex), KernelActionHandler),
- (r"/api/kernels/%s/channels" % _kernel_id_regex, ZMQChannelsHandler),
- ]
|