test_proxy_steerable.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. import time
  4. import struct
  5. import zmq
  6. from zmq import devices
  7. from zmq.tests import BaseZMQTestCase, SkipTest, PYPY
  8. if PYPY:
  9. # cleanup of shared Context doesn't work on PyPy
  10. devices.Device.context_factory = zmq.Context
  11. class TestProxySteerable(BaseZMQTestCase):
  12. def test_proxy_steerable(self):
  13. if zmq.zmq_version_info() < (4, 1):
  14. raise SkipTest("Steerable Proxies only in libzmq >= 4.1")
  15. dev = devices.ThreadProxySteerable(
  16. zmq.PULL,
  17. zmq.PUSH,
  18. zmq.PUSH,
  19. zmq.PAIR
  20. )
  21. iface = 'tcp://127.0.0.1'
  22. port = dev.bind_in_to_random_port(iface)
  23. port2 = dev.bind_out_to_random_port(iface)
  24. port3 = dev.bind_mon_to_random_port(iface)
  25. port4 = dev.bind_ctrl_to_random_port(iface)
  26. dev.start()
  27. time.sleep(0.25)
  28. msg = b'hello'
  29. push = self.context.socket(zmq.PUSH)
  30. push.connect("%s:%i" % (iface, port))
  31. pull = self.context.socket(zmq.PULL)
  32. pull.connect("%s:%i" % (iface, port2))
  33. mon = self.context.socket(zmq.PULL)
  34. mon.connect("%s:%i" % (iface, port3))
  35. ctrl = self.context.socket(zmq.PAIR)
  36. ctrl.connect("%s:%i" % (iface, port4))
  37. push.send(msg)
  38. self.sockets.extend([push, pull, mon, ctrl])
  39. self.assertEqual(msg, self.recv(pull))
  40. self.assertEqual(msg, self.recv(mon))
  41. ctrl.send(b'TERMINATE')
  42. dev.join()
  43. def test_proxy_steerable_bind_to_random_with_args(self):
  44. if zmq.zmq_version_info() < (4, 1):
  45. raise SkipTest("Steerable Proxies only in libzmq >= 4.1")
  46. dev = devices.ThreadProxySteerable(
  47. zmq.PULL,
  48. zmq.PUSH,
  49. zmq.PUSH,
  50. zmq.PAIR
  51. )
  52. iface = 'tcp://127.0.0.1'
  53. ports = []
  54. min, max = 5000, 5050
  55. ports.extend([
  56. dev.bind_in_to_random_port(iface, min_port=min, max_port=max),
  57. dev.bind_out_to_random_port(iface, min_port=min, max_port=max),
  58. dev.bind_mon_to_random_port(iface, min_port=min, max_port=max),
  59. dev.bind_ctrl_to_random_port(iface, min_port=min, max_port=max)
  60. ])
  61. for port in ports:
  62. if port < min or port > max:
  63. self.fail('Unexpected port number: %i' % port)
  64. def test_proxy_steerable_statistics(self):
  65. if zmq.zmq_version_info() < (4, 3):
  66. raise SkipTest("STATISTICS only in libzmq >= 4.3")
  67. dev = devices.ThreadProxySteerable(
  68. zmq.PULL,
  69. zmq.PUSH,
  70. zmq.PUSH,
  71. zmq.PAIR
  72. )
  73. iface = 'tcp://127.0.0.1'
  74. port = dev.bind_in_to_random_port(iface)
  75. port2 = dev.bind_out_to_random_port(iface)
  76. port3 = dev.bind_mon_to_random_port(iface)
  77. port4 = dev.bind_ctrl_to_random_port(iface)
  78. dev.start()
  79. time.sleep(0.25)
  80. msg = b'hello'
  81. push = self.context.socket(zmq.PUSH)
  82. push.connect("%s:%i" % (iface, port))
  83. pull = self.context.socket(zmq.PULL)
  84. pull.connect("%s:%i" % (iface, port2))
  85. mon = self.context.socket(zmq.PULL)
  86. mon.connect("%s:%i" % (iface, port3))
  87. ctrl = self.context.socket(zmq.PAIR)
  88. ctrl.connect("%s:%i" % (iface, port4))
  89. push.send(msg)
  90. self.sockets.extend([push, pull, mon, ctrl])
  91. self.assertEqual(msg, self.recv(pull))
  92. self.assertEqual(msg, self.recv(mon))
  93. ctrl.send(b'STATISTICS')
  94. stats = self.recv_multipart(ctrl)
  95. stats_int = [struct.unpack("=Q", x)[0] for x in stats]
  96. self.assertEqual(1, stats_int[0])
  97. self.assertEqual(len(msg), stats_int[1])
  98. self.assertEqual(1, stats_int[6])
  99. self.assertEqual(len(msg), stats_int[7])
  100. ctrl.send(b'TERMINATE')
  101. dev.join()