channels.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. """Base classes to manage a Client's interaction with a running kernel"""
  2. # Copyright (c) Jupyter Development Team.
  3. # Distributed under the terms of the Modified BSD License.
  4. from __future__ import absolute_import
  5. import atexit
  6. import errno
  7. from threading import Thread, Event
  8. import time
  9. import zmq
  10. # import ZMQError in top-level namespace, to avoid ugly attribute-error messages
  11. # during garbage collection of threads at exit:
  12. from zmq import ZMQError
  13. from jupyter_client import protocol_version_info
  14. from .channelsabc import HBChannelABC
  15. #-----------------------------------------------------------------------------
  16. # Constants and exceptions
  17. #-----------------------------------------------------------------------------
  18. major_protocol_version = protocol_version_info[0]
  19. class InvalidPortNumber(Exception):
  20. pass
  21. class HBChannel(Thread):
  22. """The heartbeat channel which monitors the kernel heartbeat.
  23. Note that the heartbeat channel is paused by default. As long as you start
  24. this channel, the kernel manager will ensure that it is paused and un-paused
  25. as appropriate.
  26. """
  27. context = None
  28. session = None
  29. socket = None
  30. address = None
  31. _exiting = False
  32. time_to_dead = 1.
  33. poller = None
  34. _running = None
  35. _pause = None
  36. _beating = None
  37. def __init__(self, context=None, session=None, address=None):
  38. """Create the heartbeat monitor thread.
  39. Parameters
  40. ----------
  41. context : :class:`zmq.Context`
  42. The ZMQ context to use.
  43. session : :class:`session.Session`
  44. The session to use.
  45. address : zmq url
  46. Standard (ip, port) tuple that the kernel is listening on.
  47. """
  48. super(HBChannel, self).__init__()
  49. self.daemon = True
  50. self.context = context
  51. self.session = session
  52. if isinstance(address, tuple):
  53. if address[1] == 0:
  54. message = 'The port number for a channel cannot be 0.'
  55. raise InvalidPortNumber(message)
  56. address = "tcp://%s:%i" % address
  57. self.address = address
  58. # running is False until `.start()` is called
  59. self._running = False
  60. self._exit = Event()
  61. # don't start paused
  62. self._pause = False
  63. self.poller = zmq.Poller()
  64. @staticmethod
  65. @atexit.register
  66. def _notice_exit():
  67. # Class definitions can be torn down during interpreter shutdown.
  68. # We only need to set _exiting flag if this hasn't happened.
  69. if HBChannel is not None:
  70. HBChannel._exiting = True
  71. def _create_socket(self):
  72. if self.socket is not None:
  73. # close previous socket, before opening a new one
  74. self.poller.unregister(self.socket)
  75. self.socket.close()
  76. self.socket = self.context.socket(zmq.REQ)
  77. self.socket.linger = 1000
  78. self.socket.connect(self.address)
  79. self.poller.register(self.socket, zmq.POLLIN)
  80. def _poll(self, start_time):
  81. """poll for heartbeat replies until we reach self.time_to_dead.
  82. Ignores interrupts, and returns the result of poll(), which
  83. will be an empty list if no messages arrived before the timeout,
  84. or the event tuple if there is a message to receive.
  85. """
  86. until_dead = self.time_to_dead - (time.time() - start_time)
  87. # ensure poll at least once
  88. until_dead = max(until_dead, 1e-3)
  89. events = []
  90. while True:
  91. try:
  92. events = self.poller.poll(1000 * until_dead)
  93. except ZMQError as e:
  94. if e.errno == errno.EINTR:
  95. # ignore interrupts during heartbeat
  96. # this may never actually happen
  97. until_dead = self.time_to_dead - (time.time() - start_time)
  98. until_dead = max(until_dead, 1e-3)
  99. pass
  100. else:
  101. raise
  102. except Exception:
  103. if self._exiting:
  104. break
  105. else:
  106. raise
  107. else:
  108. break
  109. return events
  110. def run(self):
  111. """The thread's main activity. Call start() instead."""
  112. self._create_socket()
  113. self._running = True
  114. self._beating = True
  115. while self._running:
  116. if self._pause:
  117. # just sleep, and skip the rest of the loop
  118. self._exit.wait(self.time_to_dead)
  119. continue
  120. since_last_heartbeat = 0.0
  121. # io.rprint('Ping from HB channel') # dbg
  122. # no need to catch EFSM here, because the previous event was
  123. # either a recv or connect, which cannot be followed by EFSM
  124. self.socket.send(b'ping')
  125. request_time = time.time()
  126. ready = self._poll(request_time)
  127. if ready:
  128. self._beating = True
  129. # the poll above guarantees we have something to recv
  130. self.socket.recv()
  131. # sleep the remainder of the cycle
  132. remainder = self.time_to_dead - (time.time() - request_time)
  133. if remainder > 0:
  134. self._exit.wait(remainder)
  135. continue
  136. else:
  137. # nothing was received within the time limit, signal heart failure
  138. self._beating = False
  139. since_last_heartbeat = time.time() - request_time
  140. self.call_handlers(since_last_heartbeat)
  141. # and close/reopen the socket, because the REQ/REP cycle has been broken
  142. self._create_socket()
  143. continue
  144. def pause(self):
  145. """Pause the heartbeat."""
  146. self._pause = True
  147. def unpause(self):
  148. """Unpause the heartbeat."""
  149. self._pause = False
  150. def is_beating(self):
  151. """Is the heartbeat running and responsive (and not paused)."""
  152. if self.is_alive() and not self._pause and self._beating:
  153. return True
  154. else:
  155. return False
  156. def stop(self):
  157. """Stop the channel's event loop and join its thread."""
  158. self._running = False
  159. self._exit.set()
  160. self.join()
  161. self.close()
  162. def close(self):
  163. if self.socket is not None:
  164. try:
  165. self.socket.close(linger=0)
  166. except Exception:
  167. pass
  168. self.socket = None
  169. def call_handlers(self, since_last_heartbeat):
  170. """This method is called in the ioloop thread when a message arrives.
  171. Subclasses should override this method to handle incoming messages.
  172. It is important to remember that this method is called in the thread
  173. so that some logic must be done to ensure that the application level
  174. handlers are called in the application thread.
  175. """
  176. pass
  177. HBChannelABC.register(HBChannel)