123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- """ Defines a KernelClient that provides thread-safe sockets with async callbacks on message replies.
- """
- from __future__ import absolute_import
- import atexit
- import errno
- import sys
- from threading import Thread, Event
- import time
- # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
- # during garbage collection of threads at exit:
- from zmq import ZMQError
- from zmq.eventloop import ioloop, zmqstream
- # Local imports
- from traitlets import Type, Instance
- from jupyter_client.channels import HBChannel
- from jupyter_client import KernelClient
- class ThreadedZMQSocketChannel(object):
- """A ZMQ socket invoking a callback in the ioloop"""
- session = None
- socket = None
- ioloop = None
- stream = None
- _inspect = None
- def __init__(self, socket, session, loop):
- """Create a channel.
- Parameters
- ----------
- socket : :class:`zmq.Socket`
- The ZMQ socket to use.
- session : :class:`session.Session`
- The session to use.
- loop
- A pyzmq ioloop to connect the socket to using a ZMQStream
- """
- super(ThreadedZMQSocketChannel, self).__init__()
- self.socket = socket
- self.session = session
- self.ioloop = loop
- evt = Event()
- def setup_stream():
- self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
- self.stream.on_recv(self._handle_recv)
- evt.set()
- self.ioloop.add_callback(setup_stream)
- evt.wait()
- _is_alive = False
- def is_alive(self):
- return self._is_alive
- def start(self):
- self._is_alive = True
- def stop(self):
- self._is_alive = False
- def close(self):
- if self.socket is not None:
- try:
- self.socket.close(linger=0)
- except Exception:
- pass
- self.socket = None
- def send(self, msg):
- """Queue a message to be sent from the IOLoop's thread.
- Parameters
- ----------
- msg : message to send
- This is threadsafe, as it uses IOLoop.add_callback to give the loop's
- thread control of the action.
- """
- def thread_send():
- self.session.send(self.stream, msg)
- self.ioloop.add_callback(thread_send)
- def _handle_recv(self, msg):
- """Callback for stream.on_recv.
- Unpacks message, and calls handlers with it.
- """
- ident,smsg = self.session.feed_identities(msg)
- msg = self.session.deserialize(smsg)
- # let client inspect messages
- if self._inspect:
- self._inspect(msg)
- self.call_handlers(msg)
- def call_handlers(self, msg):
- """This method is called in the ioloop thread when a message arrives.
- Subclasses should override this method to handle incoming messages.
- It is important to remember that this method is called in the thread
- so that some logic must be done to ensure that the application level
- handlers are called in the application thread.
- """
- pass
- def process_events(self):
- """Subclasses should override this with a method
- processing any pending GUI events.
- """
- pass
- def flush(self, timeout=1.0):
- """Immediately processes all pending messages on this channel.
- This is only used for the IOPub channel.
- Callers should use this method to ensure that :meth:`call_handlers`
- has been called for all messages that have been received on the
- 0MQ SUB socket of this channel.
- This method is thread safe.
- Parameters
- ----------
- timeout : float, optional
- The maximum amount of time to spend flushing, in seconds. The
- default is one second.
- """
- # We do the IOLoop callback process twice to ensure that the IOLoop
- # gets to perform at least one full poll.
- stop_time = time.time() + timeout
- for i in range(2):
- self._flushed = False
- self.ioloop.add_callback(self._flush)
- while not self._flushed and time.time() < stop_time:
- time.sleep(0.01)
- def _flush(self):
- """Callback for :method:`self.flush`."""
- self.stream.flush()
- self._flushed = True
- class IOLoopThread(Thread):
- """Run a pyzmq ioloop in a thread to send and receive messages
- """
- _exiting = False
- ioloop = None
- def __init__(self):
- super(IOLoopThread, self).__init__()
- self.daemon = True
- @staticmethod
- @atexit.register
- def _notice_exit():
- # Class definitions can be torn down during interpreter shutdown.
- # We only need to set _exiting flag if this hasn't happened.
- if IOLoopThread is not None:
- IOLoopThread._exiting = True
- def start(self):
- """Start the IOLoop thread
- Don't return until self.ioloop is defined,
- which is created in the thread
- """
- self._start_event = Event()
- Thread.start(self)
- self._start_event.wait()
- def run(self):
- """Run my loop, ignoring EINTR events in the poller"""
- if 'asyncio' in sys.modules:
- # tornado may be using asyncio,
- # ensure an eventloop exists for this thread
- import asyncio
- asyncio.set_event_loop(asyncio.new_event_loop())
- self.ioloop = ioloop.IOLoop()
- # signal that self.ioloop is defined
- self._start_event.set()
- while True:
- try:
- self.ioloop.start()
- except ZMQError as e:
- if e.errno == errno.EINTR:
- continue
- else:
- raise
- except Exception:
- if self._exiting:
- break
- else:
- raise
- else:
- break
- def stop(self):
- """Stop the channel's event loop and join its thread.
- This calls :meth:`~threading.Thread.join` and returns when the thread
- terminates. :class:`RuntimeError` will be raised if
- :meth:`~threading.Thread.start` is called again.
- """
- if self.ioloop is not None:
- self.ioloop.add_callback(self.ioloop.stop)
- self.join()
- self.close()
- self.ioloop = None
- def close(self):
- if self.ioloop is not None:
- try:
- self.ioloop.close(all_fds=True)
- except Exception:
- pass
- class ThreadedKernelClient(KernelClient):
- """ A KernelClient that provides thread-safe sockets with async callbacks on message replies.
- """
- @property
- def ioloop(self):
- return self.ioloop_thread.ioloop
- ioloop_thread = Instance(IOLoopThread, allow_none=True)
- def start_channels(self, shell=True, iopub=True, stdin=True, hb=True):
- self.ioloop_thread = IOLoopThread()
- self.ioloop_thread.start()
- if shell:
- self.shell_channel._inspect = self._check_kernel_info_reply
- super(ThreadedKernelClient, self).start_channels(shell, iopub, stdin, hb)
- def _check_kernel_info_reply(self, msg):
- """This is run in the ioloop thread when the kernel info reply is received
- """
- if msg['msg_type'] == 'kernel_info_reply':
- self._handle_kernel_info_reply(msg)
- self.shell_channel._inspect = None
- def stop_channels(self):
- super(ThreadedKernelClient, self).stop_channels()
- if self.ioloop_thread.is_alive():
- self.ioloop_thread.stop()
- iopub_channel_class = Type(ThreadedZMQSocketChannel)
- shell_channel_class = Type(ThreadedZMQSocketChannel)
- stdin_channel_class = Type(ThreadedZMQSocketChannel)
- hb_channel_class = Type(HBChannel)
|