test_monqueue.py 8.0 KB


  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. import time
  4. from unittest import TestCase
  5. import zmq
  6. from zmq import devices
  7. from zmq.tests import BaseZMQTestCase, SkipTest, PYPY
  8. from zmq.utils.strtypes import unicode
  9. if PYPY or zmq.zmq_version_info() >= (4,1):
  10. # cleanup of shared Context doesn't work on PyPy
  11. # there also seems to be a bug in cleanup in libzmq-4.1 (zeromq/libzmq#1052)
  12. devices.Device.context_factory = zmq.Context
  13. class TestMonitoredQueue(BaseZMQTestCase):
  14. sockets = []
  15. def build_device(self, mon_sub=b"", in_prefix=b'in', out_prefix=b'out'):
  16. self.device = devices.ThreadMonitoredQueue(zmq.PAIR, zmq.PAIR, zmq.PUB,
  17. in_prefix, out_prefix)
  18. alice = self.context.socket(zmq.PAIR)
  19. bob = self.context.socket(zmq.PAIR)
  20. mon = self.context.socket(zmq.SUB)
  21. aport = alice.bind_to_random_port('tcp://127.0.0.1')
  22. bport = bob.bind_to_random_port('tcp://127.0.0.1')
  23. mport = mon.bind_to_random_port('tcp://127.0.0.1')
  24. mon.setsockopt(zmq.SUBSCRIBE, mon_sub)
  25. self.device.connect_in("tcp://127.0.0.1:%i"%aport)
  26. self.device.connect_out("tcp://127.0.0.1:%i"%bport)
  27. self.device.connect_mon("tcp://127.0.0.1:%i"%mport)
  28. self.device.start()
  29. time.sleep(.2)
  30. try:
  31. # this is currenlty necessary to ensure no dropped monitor messages
  32. # see LIBZMQ-248 for more info
  33. mon.recv_multipart(zmq.NOBLOCK)
  34. except zmq.ZMQError:
  35. pass
  36. self.sockets.extend([alice, bob, mon])
  37. return alice, bob, mon
  38. def teardown_device(self):
  39. for socket in self.sockets:
  40. socket.close()
  41. del socket
  42. del self.device
  43. def test_reply(self):
  44. alice, bob, mon = self.build_device()
  45. alices = b"hello bob".split()
  46. alice.send_multipart(alices)
  47. bobs = self.recv_multipart(bob)
  48. self.assertEqual(alices, bobs)
  49. bobs = b"hello alice".split()
  50. bob.send_multipart(bobs)
  51. alices = self.recv_multipart(alice)
  52. self.assertEqual(alices, bobs)
  53. self.teardown_device()
  54. def test_queue(self):
  55. alice, bob, mon = self.build_device()
  56. alices = b"hello bob".split()
  57. alice.send_multipart(alices)
  58. alices2 = b"hello again".split()
  59. alice.send_multipart(alices2)
  60. alices3 = b"hello again and again".split()
  61. alice.send_multipart(alices3)
  62. bobs = self.recv_multipart(bob)
  63. self.assertEqual(alices, bobs)
  64. bobs = self.recv_multipart(bob)
  65. self.assertEqual(alices2, bobs)
  66. bobs = self.recv_multipart(bob)
  67. self.assertEqual(alices3, bobs)
  68. bobs = b"hello alice".split()
  69. bob.send_multipart(bobs)
  70. alices = self.recv_multipart(alice)
  71. self.assertEqual(alices, bobs)
  72. self.teardown_device()
  73. def test_monitor(self):
  74. alice, bob, mon = self.build_device()
  75. alices = b"hello bob".split()
  76. alice.send_multipart(alices)
  77. alices2 = b"hello again".split()
  78. alice.send_multipart(alices2)
  79. alices3 = b"hello again and again".split()
  80. alice.send_multipart(alices3)
  81. bobs = self.recv_multipart(bob)
  82. self.assertEqual(alices, bobs)
  83. mons = self.recv_multipart(mon)
  84. self.assertEqual([b'in']+bobs, mons)
  85. bobs = self.recv_multipart(bob)
  86. self.assertEqual(alices2, bobs)
  87. bobs = self.recv_multipart(bob)
  88. self.assertEqual(alices3, bobs)
  89. mons = self.recv_multipart(mon)
  90. self.assertEqual([b'in']+alices2, mons)
  91. bobs = b"hello alice".split()
  92. bob.send_multipart(bobs)
  93. alices = self.recv_multipart(alice)
  94. self.assertEqual(alices, bobs)
  95. mons = self.recv_multipart(mon)
  96. self.assertEqual([b'in']+alices3, mons)
  97. mons = self.recv_multipart(mon)
  98. self.assertEqual([b'out']+bobs, mons)
  99. self.teardown_device()
  100. def test_prefix(self):
  101. alice, bob, mon = self.build_device(b"", b'foo', b'bar')
  102. alices = b"hello bob".split()
  103. alice.send_multipart(alices)
  104. alices2 = b"hello again".split()
  105. alice.send_multipart(alices2)
  106. alices3 = b"hello again and again".split()
  107. alice.send_multipart(alices3)
  108. bobs = self.recv_multipart(bob)
  109. self.assertEqual(alices, bobs)
  110. mons = self.recv_multipart(mon)
  111. self.assertEqual([b'foo']+bobs, mons)
  112. bobs = self.recv_multipart(bob)
  113. self.assertEqual(alices2, bobs)
  114. bobs = self.recv_multipart(bob)
  115. self.assertEqual(alices3, bobs)
  116. mons = self.recv_multipart(mon)
  117. self.assertEqual([b'foo']+alices2, mons)
  118. bobs = b"hello alice".split()
  119. bob.send_multipart(bobs)
  120. alices = self.recv_multipart(alice)
  121. self.assertEqual(alices, bobs)
  122. mons = self.recv_multipart(mon)
  123. self.assertEqual([b'foo']+alices3, mons)
  124. mons = self.recv_multipart(mon)
  125. self.assertEqual([b'bar']+bobs, mons)
  126. self.teardown_device()
  127. def test_monitor_subscribe(self):
  128. alice, bob, mon = self.build_device(b"out")
  129. alices = b"hello bob".split()
  130. alice.send_multipart(alices)
  131. alices2 = b"hello again".split()
  132. alice.send_multipart(alices2)
  133. alices3 = b"hello again and again".split()
  134. alice.send_multipart(alices3)
  135. bobs = self.recv_multipart(bob)
  136. self.assertEqual(alices, bobs)
  137. bobs = self.recv_multipart(bob)
  138. self.assertEqual(alices2, bobs)
  139. bobs = self.recv_multipart(bob)
  140. self.assertEqual(alices3, bobs)
  141. bobs = b"hello alice".split()
  142. bob.send_multipart(bobs)
  143. alices = self.recv_multipart(alice)
  144. self.assertEqual(alices, bobs)
  145. mons = self.recv_multipart(mon)
  146. self.assertEqual([b'out']+bobs, mons)
  147. self.teardown_device()
  148. def test_router_router(self):
  149. """test router-router MQ devices"""
  150. dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.ROUTER, zmq.PUB, b'in', b'out')
  151. self.device = dev
  152. dev.setsockopt_in(zmq.LINGER, 0)
  153. dev.setsockopt_out(zmq.LINGER, 0)
  154. dev.setsockopt_mon(zmq.LINGER, 0)
  155. porta = dev.bind_in_to_random_port('tcp://127.0.0.1')
  156. portb = dev.bind_out_to_random_port('tcp://127.0.0.1')
  157. a = self.context.socket(zmq.DEALER)
  158. a.identity = b'a'
  159. b = self.context.socket(zmq.DEALER)
  160. b.identity = b'b'
  161. self.sockets.extend([a, b])
  162. a.connect('tcp://127.0.0.1:%i'%porta)
  163. b.connect('tcp://127.0.0.1:%i'%portb)
  164. dev.start()
  165. time.sleep(1)
  166. if zmq.zmq_version_info() >= (3,1,0):
  167. # flush erroneous poll state, due to LIBZMQ-280
  168. ping_msg = [ b'ping', b'pong' ]
  169. for s in (a,b):
  170. s.send_multipart(ping_msg)
  171. try:
  172. s.recv(zmq.NOBLOCK)
  173. except zmq.ZMQError:
  174. pass
  175. msg = [ b'hello', b'there' ]
  176. a.send_multipart([b'b']+msg)
  177. bmsg = self.recv_multipart(b)
  178. self.assertEqual(bmsg, [b'a']+msg)
  179. b.send_multipart(bmsg)
  180. amsg = self.recv_multipart(a)
  181. self.assertEqual(amsg, [b'b']+msg)
  182. self.teardown_device()
  183. def test_default_mq_args(self):
  184. self.device = dev = devices.ThreadMonitoredQueue(zmq.ROUTER, zmq.DEALER, zmq.PUB)
  185. dev.setsockopt_in(zmq.LINGER, 0)
  186. dev.setsockopt_out(zmq.LINGER, 0)
  187. dev.setsockopt_mon(zmq.LINGER, 0)
  188. # this will raise if default args are wrong
  189. dev.start()
  190. self.teardown_device()
  191. def test_mq_check_prefix(self):
  192. ins = self.context.socket(zmq.ROUTER)
  193. outs = self.context.socket(zmq.DEALER)
  194. mons = self.context.socket(zmq.PUB)
  195. self.sockets.extend([ins, outs, mons])
  196. ins = unicode('in')
  197. outs = unicode('out')
  198. self.assertRaises(TypeError, devices.monitoredqueue, ins, outs, mons)