proxydevice.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. """Proxy classes and functions."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import zmq
  5. from zmq.devices.basedevice import Device, ThreadDevice, ProcessDevice
  6. class ProxyBase(object):
  7. """Base class for overriding methods."""
  8. def __init__(self, in_type, out_type, mon_type=zmq.PUB):
  9. Device.__init__(self, in_type=in_type, out_type=out_type)
  10. self.mon_type = mon_type
  11. self._mon_binds = []
  12. self._mon_connects = []
  13. self._mon_sockopts = []
  14. def bind_mon(self, addr):
  15. """Enqueue ZMQ address for binding on mon_socket.
  16. See zmq.Socket.bind for details.
  17. """
  18. self._mon_binds.append(addr)
  19. def bind_mon_to_random_port(self, addr, *args, **kwargs):
  20. """Enqueue a random port on the given interface for binding on
  21. mon_socket.
  22. See zmq.Socket.bind_to_random_port for details.
  23. .. versionadded:: 18.0
  24. """
  25. port = self._reserve_random_port(addr, *args, **kwargs)
  26. self.bind_mon('%s:%i' % (addr, port))
  27. return port
  28. def connect_mon(self, addr):
  29. """Enqueue ZMQ address for connecting on mon_socket.
  30. See zmq.Socket.connect for details.
  31. """
  32. self._mon_connects.append(addr)
  33. def setsockopt_mon(self, opt, value):
  34. """Enqueue setsockopt(opt, value) for mon_socket
  35. See zmq.Socket.setsockopt for details.
  36. """
  37. self._mon_sockopts.append((opt, value))
  38. def _setup_sockets(self):
  39. ins,outs = Device._setup_sockets(self)
  40. ctx = self._context
  41. mons = ctx.socket(self.mon_type)
  42. # set sockopts (must be done first, in case of zmq.IDENTITY)
  43. for opt,value in self._mon_sockopts:
  44. mons.setsockopt(opt, value)
  45. for iface in self._mon_binds:
  46. mons.bind(iface)
  47. for iface in self._mon_connects:
  48. mons.connect(iface)
  49. return ins,outs,mons
  50. def run_device(self):
  51. ins,outs,mons = self._setup_sockets()
  52. zmq.proxy(ins, outs, mons)
  53. class Proxy(ProxyBase, Device):
  54. """Threadsafe Proxy object.
  55. See zmq.devices.Device for most of the spec. This subclass adds a
  56. <method>_mon version of each <method>_{in|out} method, for configuring the
  57. monitor socket.
  58. A Proxy is a 3-socket ZMQ Device that functions just like a
  59. QUEUE, except each message is also sent out on the monitor socket.
  60. A PUB socket is the most logical choice for the mon_socket, but it is not required.
  61. """
  62. pass
  63. class ThreadProxy(ProxyBase, ThreadDevice):
  64. """Proxy in a Thread. See Proxy for more."""
  65. pass
  66. class ProcessProxy(ProxyBase, ProcessDevice):
  67. """Proxy in a Process. See Proxy for more."""
  68. pass
  69. __all__ = [
  70. 'Proxy',
  71. 'ThreadProxy',
  72. 'ProcessProxy',
  73. ]