123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- import time
- import sys
- import unittest
- from ipython_genutils.py3compat import PY3
- from jupyter_client.blocking.channels import Empty
- from qtconsole.manager import QtKernelManager
- PY2 = sys.version[0] == '2'
- if PY2:
- TimeoutError = RuntimeError
- class Tests(unittest.TestCase):
- def setUp(self):
- """Open a kernel."""
- self.kernel_manager = QtKernelManager()
- self.kernel_manager.start_kernel()
- self.kernel_client = self.kernel_manager.client()
- self.kernel_client.start_channels(shell=True, iopub=True)
- self.blocking_client = self.kernel_client.blocking_client()
- self.blocking_client.start_channels(shell=True, iopub=True)
- self.comm_manager = self.kernel_client.comm_manager
- # Check if client is working
- self.blocking_client.execute('print(0)')
- try:
- self._get_next_msg()
- self._get_next_msg()
- except TimeoutError:
- # Maybe it works now?
- self.blocking_client.execute('print(0)')
- self._get_next_msg()
- self._get_next_msg()
- def tearDown(self):
- """Close the kernel."""
- if self.kernel_manager:
- self.kernel_manager.shutdown_kernel(now=True)
- if self.kernel_client:
- self.kernel_client.shutdown()
- def _get_next_msg(self, timeout=10):
- # Get status messages
- timeout_time = time.time() + timeout
- msg_type = 'status'
- while msg_type == 'status':
- if timeout_time < time.time():
- raise TimeoutError
- try:
- msg = self.blocking_client.get_iopub_msg(timeout=3)
- msg_type = msg['header']['msg_type']
- except Empty:
- pass
- return msg
-
- def test_kernel_to_frontend(self):
- """Communicate from the kernel to the frontend."""
- comm_manager = self.comm_manager
- blocking_client = self.blocking_client
- class DummyCommHandler():
- def __init__(self):
- comm_manager.register_target('test_api', self.comm_open)
- self.last_msg = None
-
- def comm_open(self, comm, msg):
- comm.on_msg(self.comm_message)
- comm.on_close(self.comm_message)
- self.last_msg = msg['content']['data']
- self.comm = comm
-
- def comm_message(self, msg):
- self.last_msg = msg['content']['data']
-
- handler = DummyCommHandler()
- blocking_client.execute(
- "from ipykernel.comm import Comm\n"
- "comm = Comm(target_name='test_api', data='open')\n"
- "comm.send('message')\n"
- "comm.close('close')\n"
- "del comm\n"
- "print('Done')\n"
- )
- # Get input
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'execute_input'
- # Open comm
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'comm_open'
- comm_manager._dispatch(msg)
- assert handler.last_msg == 'open'
- assert handler.comm.comm_id == msg['content']['comm_id']
- # Get message
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'comm_msg'
- comm_manager._dispatch(msg)
- assert handler.last_msg == 'message'
- assert handler.comm.comm_id == msg['content']['comm_id']
- # Get close
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'comm_close'
- comm_manager._dispatch(msg)
- assert handler.last_msg == 'close'
- assert handler.comm.comm_id == msg['content']['comm_id']
- # Get close
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'stream'
- def test_frontend_to_kernel(self):
- """Communicate from the frontend to the kernel."""
- comm_manager = self.comm_manager
- blocking_client = self.blocking_client
- blocking_client.execute(
- "class DummyCommHandler():\n"
- " def __init__(self):\n"
- " get_ipython().kernel.comm_manager.register_target(\n"
- " 'test_api', self.comm_open)\n"
- " def comm_open(self, comm, msg):\n"
- " comm.on_msg(self.comm_message)\n"
- " comm.on_close(self.comm_message)\n"
- " print(msg['content']['data'])\n"
- " def comm_message(self, msg):\n"
- " print(msg['content']['data'])\n"
- "dummy = DummyCommHandler()\n"
- )
- # Get input
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'execute_input'
- # Open comm
- comm = comm_manager.new_comm('test_api', data='open')
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'stream'
- assert msg['content']['text'] == 'open\n'
- # Get message
- comm.send('message')
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'stream'
- assert msg['content']['text'] == 'message\n'
- # Get close
- comm.close('close')
- msg = self._get_next_msg()
- # Received message has a header and parent header. The parent header has
- # the info about the close message type in Python 3
- if PY3:
- assert msg['parent_header']['msg_type'] == 'comm_close'
- assert msg['msg_type'] == 'stream'
- assert msg['content']['text'] == 'close\n'
- else:
- # For some reason ipykernel notifies me that it is closing,
- # even though I closed the comm
- assert msg['header']['msg_type'] == 'comm_close'
- assert comm.comm_id == msg['content']['comm_id']
- msg = self._get_next_msg()
- assert msg['header']['msg_type'] == 'stream'
- if __name__ == "__main__":
- unittest.main()
|