basedevice.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281
  1. """Classes for running 0MQ Devices in the background."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import time
  5. from threading import Thread
  6. from multiprocessing import Process
  7. from zmq import device, QUEUE, REQ, Context, ETERM, ZMQBindError, ZMQError
  8. class Device:
  9. """A 0MQ Device to be run in the background.
  10. You do not pass Socket instances to this, but rather Socket types::
  11. Device(device_type, in_socket_type, out_socket_type)
  12. For instance::
  13. dev = Device(zmq.QUEUE, zmq.DEALER, zmq.ROUTER)
  14. Similar to zmq.device, but socket types instead of sockets themselves are
  15. passed, and the sockets are created in the work thread, to avoid issues
  16. with thread safety. As a result, additional bind_{in|out} and
  17. connect_{in|out} methods and setsockopt_{in|out} allow users to specify
  18. connections for the sockets.
  19. Parameters
  20. ----------
  21. device_type : int
  22. The 0MQ Device type
  23. {in|out}_type : int
  24. zmq socket types, to be passed later to context.socket(). e.g.
  25. zmq.PUB, zmq.SUB, zmq.REQ. If out_type is < 0, then in_socket is used
  26. for both in_socket and out_socket.
  27. Methods
  28. -------
  29. bind_{in_out}(iface)
  30. passthrough for ``{in|out}_socket.bind(iface)``, to be called in the thread
  31. connect_{in_out}(iface)
  32. passthrough for ``{in|out}_socket.connect(iface)``, to be called in the
  33. thread
  34. setsockopt_{in_out}(opt,value)
  35. passthrough for ``{in|out}_socket.setsockopt(opt, value)``, to be called in
  36. the thread
  37. Attributes
  38. ----------
  39. daemon : int
  40. sets whether the thread should be run as a daemon
  41. Default is true, because if it is false, the thread will not
  42. exit unless it is killed
  43. context_factory : callable (class attribute)
  44. Function for creating the Context. This will be Context.instance
  45. in ThreadDevices, and Context in ProcessDevices. The only reason
  46. it is not instance() in ProcessDevices is that there may be a stale
  47. Context instance already initialized, and the forked environment
  48. should *never* try to use it.
  49. """
  50. context_factory = Context.instance
  51. """Callable that returns a context. Typically either Context.instance or Context,
  52. depending on whether the device should share the global instance or not.
  53. """
  54. def __init__(self, device_type=QUEUE, in_type=None, out_type=None):
  55. self.device_type = device_type
  56. if in_type is None:
  57. raise TypeError("in_type must be specified")
  58. if out_type is None:
  59. raise TypeError("out_type must be specified")
  60. self.in_type = in_type
  61. self.out_type = out_type
  62. self._in_binds = []
  63. self._in_connects = []
  64. self._in_sockopts = []
  65. self._out_binds = []
  66. self._out_connects = []
  67. self._out_sockopts = []
  68. self._random_addrs = []
  69. self.daemon = True
  70. self.done = False
  71. def bind_in(self, addr):
  72. """Enqueue ZMQ address for binding on in_socket.
  73. See zmq.Socket.bind for details.
  74. """
  75. self._in_binds.append(addr)
  76. def bind_in_to_random_port(self, addr, *args, **kwargs):
  77. """Enqueue a random port on the given interface for binding on
  78. in_socket.
  79. See zmq.Socket.bind_to_random_port for details.
  80. .. versionadded:: 18.0
  81. """
  82. port = self._reserve_random_port(addr, *args, **kwargs)
  83. self.bind_in('%s:%i' % (addr, port))
  84. return port
  85. def connect_in(self, addr):
  86. """Enqueue ZMQ address for connecting on in_socket.
  87. See zmq.Socket.connect for details.
  88. """
  89. self._in_connects.append(addr)
  90. def setsockopt_in(self, opt, value):
  91. """Enqueue setsockopt(opt, value) for in_socket
  92. See zmq.Socket.setsockopt for details.
  93. """
  94. self._in_sockopts.append((opt, value))
  95. def bind_out(self, addr):
  96. """Enqueue ZMQ address for binding on out_socket.
  97. See zmq.Socket.bind for details.
  98. """
  99. self._out_binds.append(addr)
  100. def bind_out_to_random_port(self, addr, *args, **kwargs):
  101. """Enqueue a random port on the given interface for binding on
  102. out_socket.
  103. See zmq.Socket.bind_to_random_port for details.
  104. .. versionadded:: 18.0
  105. """
  106. port = self._reserve_random_port(addr, *args, **kwargs)
  107. self.bind_out('%s:%i' % (addr, port))
  108. return port
  109. def connect_out(self, addr):
  110. """Enqueue ZMQ address for connecting on out_socket.
  111. See zmq.Socket.connect for details.
  112. """
  113. self._out_connects.append(addr)
  114. def setsockopt_out(self, opt, value):
  115. """Enqueue setsockopt(opt, value) for out_socket
  116. See zmq.Socket.setsockopt for details.
  117. """
  118. self._out_sockopts.append((opt, value))
  119. def _reserve_random_port(self, addr, *args, **kwargs):
  120. ctx = Context()
  121. binder = ctx.socket(REQ)
  122. for i in range(5):
  123. port = binder.bind_to_random_port(addr, *args, **kwargs)
  124. new_addr = '%s:%i' % (addr, port)
  125. if new_addr in self._random_addrs:
  126. continue
  127. else:
  128. break
  129. else:
  130. raise ZMQBindError("Could not reserve random port.")
  131. self._random_addrs.append(new_addr)
  132. binder.close()
  133. return port
  134. def _setup_sockets(self):
  135. ctx = self.context_factory()
  136. self._context = ctx
  137. # create the sockets
  138. ins = ctx.socket(self.in_type)
  139. if self.out_type < 0:
  140. outs = ins
  141. else:
  142. outs = ctx.socket(self.out_type)
  143. # set sockopts (must be done first, in case of zmq.IDENTITY)
  144. for opt,value in self._in_sockopts:
  145. ins.setsockopt(opt, value)
  146. for opt,value in self._out_sockopts:
  147. outs.setsockopt(opt, value)
  148. for iface in self._in_binds:
  149. ins.bind(iface)
  150. for iface in self._out_binds:
  151. outs.bind(iface)
  152. for iface in self._in_connects:
  153. ins.connect(iface)
  154. for iface in self._out_connects:
  155. outs.connect(iface)
  156. return ins,outs
  157. def run_device(self):
  158. """The runner method.
  159. Do not call me directly, instead call ``self.start()``, just like a Thread.
  160. """
  161. ins,outs = self._setup_sockets()
  162. device(self.device_type, ins, outs)
  163. def run(self):
  164. """wrap run_device in try/catch ETERM"""
  165. try:
  166. self.run_device()
  167. except ZMQError as e:
  168. if e.errno == ETERM:
  169. # silence TERM errors, because this should be a clean shutdown
  170. pass
  171. else:
  172. raise
  173. finally:
  174. self.done = True
  175. def start(self):
  176. """Start the device. Override me in subclass for other launchers."""
  177. return self.run()
  178. def join(self,timeout=None):
  179. """wait for me to finish, like Thread.join.
  180. Reimplemented appropriately by subclasses."""
  181. tic = time.time()
  182. toc = tic
  183. while not self.done and not (timeout is not None and toc-tic > timeout):
  184. time.sleep(.001)
  185. toc = time.time()
  186. class BackgroundDevice(Device):
  187. """Base class for launching Devices in background processes and threads."""
  188. launcher=None
  189. _launch_class=None
  190. def start(self):
  191. self.launcher = self._launch_class(target=self.run)
  192. self.launcher.daemon = self.daemon
  193. return self.launcher.start()
  194. def join(self, timeout=None):
  195. return self.launcher.join(timeout=timeout)
  196. class ThreadDevice(BackgroundDevice):
  197. """A Device that will be run in a background Thread.
  198. See Device for details.
  199. """
  200. _launch_class=Thread
  201. class ProcessDevice(BackgroundDevice):
  202. """A Device that will be run in a background Process.
  203. See Device for details.
  204. """
  205. _launch_class=Process
  206. context_factory = Context
  207. """Callable that returns a context. Typically either Context.instance or Context,
  208. depending on whether the device should share the global instance or not.
  209. """
  210. __all__ = ['Device', 'ThreadDevice', 'ProcessDevice']