channels.py 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. """Blocking channels
  2. Useful for test suites and blocking terminal interfaces.
  3. """
  4. # Copyright (c) Jupyter Development Team.
  5. # Distributed under the terms of the Modified BSD License.
  6. try:
  7. from queue import Queue, Empty # Py 3
  8. except ImportError:
  9. from Queue import Queue, Empty # Py 2
  10. class ZMQSocketChannel(object):
  11. """A ZMQ socket in a simple blocking API"""
  12. session = None
  13. socket = None
  14. stream = None
  15. _exiting = False
  16. proxy_methods = []
  17. def __init__(self, socket, session, loop=None):
  18. """Create a channel.
  19. Parameters
  20. ----------
  21. socket : :class:`zmq.Socket`
  22. The ZMQ socket to use.
  23. session : :class:`session.Session`
  24. The session to use.
  25. loop
  26. Unused here, for other implementations
  27. """
  28. super(ZMQSocketChannel, self).__init__()
  29. self.socket = socket
  30. self.session = session
  31. def _recv(self, **kwargs):
  32. msg = self.socket.recv_multipart(**kwargs)
  33. ident,smsg = self.session.feed_identities(msg)
  34. return self.session.deserialize(smsg)
  35. def get_msg(self, block=True, timeout=None):
  36. """ Gets a message if there is one that is ready. """
  37. if block:
  38. if timeout is not None:
  39. timeout *= 1000 # seconds to ms
  40. ready = self.socket.poll(timeout)
  41. else:
  42. ready = self.socket.poll(timeout=0)
  43. if ready:
  44. return self._recv()
  45. else:
  46. raise Empty
  47. def get_msgs(self):
  48. """ Get all messages that are currently ready. """
  49. msgs = []
  50. while True:
  51. try:
  52. msgs.append(self.get_msg(block=False))
  53. except Empty:
  54. break
  55. return msgs
  56. def msg_ready(self):
  57. """ Is there a message that has been received? """
  58. return bool(self.socket.poll(timeout=0))
  59. def close(self):
  60. if self.socket is not None:
  61. try:
  62. self.socket.close(linger=0)
  63. except Exception:
  64. pass
  65. self.socket = None
  66. stop = close
  67. def is_alive(self):
  68. return (self.socket is not None)
  69. def send(self, msg):
  70. """Pass a message to the ZMQ socket to send
  71. """
  72. self.session.send(self.socket, msg)
  73. def start(self):
  74. pass