test_device.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. import time
  4. import zmq
  5. from zmq import devices
  6. from zmq.tests import BaseZMQTestCase, SkipTest, have_gevent, GreenTest, PYPY
  7. from zmq.utils.strtypes import (bytes,unicode,basestring)
  8. if PYPY:
  9. # cleanup of shared Context doesn't work on PyPy
  10. devices.Device.context_factory = zmq.Context
  11. class TestDevice(BaseZMQTestCase):
  12. def test_device_types(self):
  13. for devtype in (zmq.STREAMER, zmq.FORWARDER, zmq.QUEUE):
  14. dev = devices.Device(devtype, zmq.PAIR, zmq.PAIR)
  15. self.assertEqual(dev.device_type, devtype)
  16. del dev
  17. def test_device_attributes(self):
  18. dev = devices.Device(zmq.QUEUE, zmq.SUB, zmq.PUB)
  19. self.assertEqual(dev.in_type, zmq.SUB)
  20. self.assertEqual(dev.out_type, zmq.PUB)
  21. self.assertEqual(dev.device_type, zmq.QUEUE)
  22. self.assertEqual(dev.daemon, True)
  23. del dev
  24. def test_single_socket_forwarder_connect(self):
  25. if zmq.zmq_version() in ('4.1.1', '4.0.6'):
  26. raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
  27. dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
  28. req = self.context.socket(zmq.REQ)
  29. port = req.bind_to_random_port('tcp://127.0.0.1')
  30. dev.connect_in('tcp://127.0.0.1:%i'%port)
  31. dev.start()
  32. time.sleep(.25)
  33. msg = b'hello'
  34. req.send(msg)
  35. self.assertEqual(msg, self.recv(req))
  36. del dev
  37. req.close()
  38. dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
  39. req = self.context.socket(zmq.REQ)
  40. port = req.bind_to_random_port('tcp://127.0.0.1')
  41. dev.connect_out('tcp://127.0.0.1:%i'%port)
  42. dev.start()
  43. time.sleep(.25)
  44. msg = b'hello again'
  45. req.send(msg)
  46. self.assertEqual(msg, self.recv(req))
  47. del dev
  48. req.close()
  49. def test_single_socket_forwarder_bind(self):
  50. if zmq.zmq_version() in ('4.1.1', '4.0.6'):
  51. raise SkipTest("libzmq-%s broke single-socket devices" % zmq.zmq_version())
  52. dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
  53. port = dev.bind_in_to_random_port('tcp://127.0.0.1')
  54. req = self.context.socket(zmq.REQ)
  55. req.connect('tcp://127.0.0.1:%i'%port)
  56. dev.start()
  57. time.sleep(.25)
  58. msg = b'hello'
  59. req.send(msg)
  60. self.assertEqual(msg, self.recv(req))
  61. del dev
  62. req.close()
  63. dev = devices.ThreadDevice(zmq.QUEUE, zmq.REP, -1)
  64. port = dev.bind_in_to_random_port('tcp://127.0.0.1')
  65. req = self.context.socket(zmq.REQ)
  66. req.connect('tcp://127.0.0.1:%i'%port)
  67. dev.start()
  68. time.sleep(.25)
  69. msg = b'hello again'
  70. req.send(msg)
  71. self.assertEqual(msg, self.recv(req))
  72. del dev
  73. req.close()
  74. def test_device_bind_to_random_with_args(self):
  75. dev = devices.ThreadDevice(zmq.PULL, zmq.PUSH, -1)
  76. iface = 'tcp://127.0.0.1'
  77. ports = []
  78. min, max = 5000, 5050
  79. ports.extend([
  80. dev.bind_in_to_random_port(iface, min_port=min, max_port=max),
  81. dev.bind_out_to_random_port(iface, min_port=min, max_port=max)
  82. ])
  83. for port in ports:
  84. if port < min or port > max:
  85. self.fail('Unexpected port number: %i' % port)
  86. def test_device_bind_to_random_binderror(self):
  87. dev = devices.ThreadDevice(zmq.PULL, zmq.PUSH, -1)
  88. iface = 'tcp://127.0.0.1'
  89. try:
  90. for i in range(11):
  91. dev.bind_in_to_random_port(
  92. iface, min_port=10000, max_port=10010
  93. )
  94. except zmq.ZMQBindError as e:
  95. return
  96. else:
  97. self.fail('Should have failed')
  98. def test_proxy(self):
  99. if zmq.zmq_version_info() < (3,2):
  100. raise SkipTest("Proxies only in libzmq >= 3")
  101. dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
  102. iface = 'tcp://127.0.0.1'
  103. port = dev.bind_in_to_random_port(iface)
  104. port2 = dev.bind_out_to_random_port(iface)
  105. port3 = dev.bind_mon_to_random_port(iface)
  106. dev.start()
  107. time.sleep(0.25)
  108. msg = b'hello'
  109. push = self.context.socket(zmq.PUSH)
  110. push.connect("%s:%i" % (iface, port))
  111. pull = self.context.socket(zmq.PULL)
  112. pull.connect("%s:%i" % (iface, port2))
  113. mon = self.context.socket(zmq.PULL)
  114. mon.connect("%s:%i" % (iface, port3))
  115. push.send(msg)
  116. self.sockets.extend([push, pull, mon])
  117. self.assertEqual(msg, self.recv(pull))
  118. self.assertEqual(msg, self.recv(mon))
  119. def test_proxy_bind_to_random_with_args(self):
  120. if zmq.zmq_version_info() < (3, 2):
  121. raise SkipTest("Proxies only in libzmq >= 3")
  122. dev = devices.ThreadProxy(zmq.PULL, zmq.PUSH, zmq.PUSH)
  123. iface = 'tcp://127.0.0.1'
  124. ports = []
  125. min, max = 5000, 5050
  126. ports.extend([
  127. dev.bind_in_to_random_port(iface, min_port=min, max_port=max),
  128. dev.bind_out_to_random_port(iface, min_port=min, max_port=max),
  129. dev.bind_mon_to_random_port(iface, min_port=min, max_port=max)
  130. ])
  131. for port in ports:
  132. if port < min or port > max:
  133. self.fail('Unexpected port number: %i' % port)
  134. if have_gevent:
  135. import gevent
  136. import zmq.green
  137. class TestDeviceGreen(GreenTest, BaseZMQTestCase):
  138. def test_green_device(self):
  139. rep = self.context.socket(zmq.REP)
  140. req = self.context.socket(zmq.REQ)
  141. self.sockets.extend([req, rep])
  142. port = rep.bind_to_random_port('tcp://127.0.0.1')
  143. g = gevent.spawn(zmq.green.device, zmq.QUEUE, rep, rep)
  144. req.connect('tcp://127.0.0.1:%i' % port)
  145. req.send(b'hi')
  146. timeout = gevent.Timeout(3)
  147. timeout.start()
  148. receiver = gevent.spawn(req.recv)
  149. self.assertEqual(receiver.get(2), b'hi')
  150. timeout.cancel()
  151. g.kill(block=True)