proxysteerabledevice.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. """Classes for running a steerable ZMQ proxy"""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import zmq
  5. from zmq.devices.proxydevice import Proxy, ThreadProxy, ProcessProxy
  6. class ProxySteerableBase(object):
  7. """Base class for overriding methods."""
  8. def __init__(self, in_type, out_type, mon_type=zmq.PUB, ctrl_type=None):
  9. super(ProxySteerableBase, self).__init__(
  10. in_type=in_type,
  11. out_type=out_type,
  12. mon_type=mon_type
  13. )
  14. self.ctrl_type = ctrl_type
  15. self._ctrl_binds = []
  16. self._ctrl_connects = []
  17. self._ctrl_sockopts = []
  18. def bind_ctrl(self, addr):
  19. """Enqueue ZMQ address for binding on ctrl_socket.
  20. See zmq.Socket.bind for details.
  21. """
  22. self._ctrl_binds.append(addr)
  23. def bind_ctrl_to_random_port(self, addr, *args, **kwargs):
  24. """Enqueue a random port on the given interface for binding on
  25. ctrl_socket.
  26. See zmq.Socket.bind_to_random_port for details.
  27. """
  28. port = self._reserve_random_port(addr, *args, **kwargs)
  29. self.bind_ctrl('%s:%i' % (addr, port))
  30. return port
  31. def connect_ctrl(self, addr):
  32. """Enqueue ZMQ address for connecting on ctrl_socket.
  33. See zmq.Socket.connect for details.
  34. """
  35. self._ctrl_connects.append(addr)
  36. def setsockopt_ctrl(self, opt, value):
  37. """Enqueue setsockopt(opt, value) for ctrl_socket
  38. See zmq.Socket.setsockopt for details.
  39. """
  40. self._ctrl_sockopts.append((opt, value))
  41. def _setup_sockets(self):
  42. ins, outs, mons = super(ProxySteerableBase, self)._setup_sockets()
  43. ctx = self._context
  44. ctrls = ctx.socket(self.ctrl_type)
  45. for opt, value in self._ctrl_sockopts:
  46. ctrls.setsockopt(opt, value)
  47. for iface in self._ctrl_binds:
  48. ctrls.bind(iface)
  49. for iface in self._ctrl_connects:
  50. ctrls.connect(iface)
  51. return ins, outs, mons, ctrls
  52. def run_device(self):
  53. ins, outs, mons, ctrls = self._setup_sockets()
  54. zmq.proxy_steerable(ins, outs, mons, ctrls)
  55. class ProxySteerable(ProxySteerableBase, Proxy):
  56. """Class for running a steerable proxy in the background.
  57. See zmq.devices.Proxy for most of the spec. If the control socket is not
  58. NULL, the proxy supports control flow, provided by the socket.
  59. If PAUSE is received on this socket, the proxy suspends its activities. If
  60. RESUME is received, it goes on. If TERMINATE is received, it terminates
  61. smoothly. If the control socket is NULL, the proxy behave exactly as if
  62. zmq.devices.Proxy had been used.
  63. This subclass adds a <method>_ctrl version of each <method>_{in|out}
  64. method, for configuring the control socket.
  65. .. versionadded:: libzmq-4.1
  66. .. versionadded:: 18.0
  67. """
  68. pass
  69. class ThreadProxySteerable(ProxySteerableBase, ThreadProxy):
  70. """ProxySteerable in a Thread. See ProxySteerable for details."""
  71. pass
  72. class ProcessProxySteerable(ProxySteerableBase, ProcessProxy):
  73. """ProxySteerable in a Process. See ProxySteerable for details."""
  74. pass
  75. __all__ = [
  76. 'ProxySteerable',
  77. 'ThreadProxySteerable',
  78. 'ProcessProxySteerable',
  79. ]