test_poll.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. import os
  4. import sys
  5. import time
  6. from pytest import mark
  7. import zmq
  8. from zmq.tests import PollZMQTestCase, have_gevent, GreenTest
  9. def wait():
  10. time.sleep(.25)
  11. class TestPoll(PollZMQTestCase):
  12. Poller = zmq.Poller
  13. def test_pair(self):
  14. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  15. # Sleep to allow sockets to connect.
  16. wait()
  17. poller = self.Poller()
  18. poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
  19. poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
  20. # Poll result should contain both sockets
  21. socks = dict(poller.poll())
  22. # Now make sure that both are send ready.
  23. self.assertEqual(socks[s1], zmq.POLLOUT)
  24. self.assertEqual(socks[s2], zmq.POLLOUT)
  25. # Now do a send on both, wait and test for zmq.POLLOUT|zmq.POLLIN
  26. s1.send(b'msg1')
  27. s2.send(b'msg2')
  28. wait()
  29. socks = dict(poller.poll())
  30. self.assertEqual(socks[s1], zmq.POLLOUT|zmq.POLLIN)
  31. self.assertEqual(socks[s2], zmq.POLLOUT|zmq.POLLIN)
  32. # Make sure that both are in POLLOUT after recv.
  33. s1.recv()
  34. s2.recv()
  35. socks = dict(poller.poll())
  36. self.assertEqual(socks[s1], zmq.POLLOUT)
  37. self.assertEqual(socks[s2], zmq.POLLOUT)
  38. poller.unregister(s1)
  39. poller.unregister(s2)
  40. def test_reqrep(self):
  41. s1, s2 = self.create_bound_pair(zmq.REP, zmq.REQ)
  42. # Sleep to allow sockets to connect.
  43. wait()
  44. poller = self.Poller()
  45. poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
  46. poller.register(s2, zmq.POLLIN|zmq.POLLOUT)
  47. # Make sure that s1 is in state 0 and s2 is in POLLOUT
  48. socks = dict(poller.poll())
  49. self.assertEqual(s1 in socks, 0)
  50. self.assertEqual(socks[s2], zmq.POLLOUT)
  51. # Make sure that s2 goes immediately into state 0 after send.
  52. s2.send(b'msg1')
  53. socks = dict(poller.poll())
  54. self.assertEqual(s2 in socks, 0)
  55. # Make sure that s1 goes into POLLIN state after a time.sleep().
  56. time.sleep(0.5)
  57. socks = dict(poller.poll())
  58. self.assertEqual(socks[s1], zmq.POLLIN)
  59. # Make sure that s1 goes into POLLOUT after recv.
  60. s1.recv()
  61. socks = dict(poller.poll())
  62. self.assertEqual(socks[s1], zmq.POLLOUT)
  63. # Make sure s1 goes into state 0 after send.
  64. s1.send(b'msg2')
  65. socks = dict(poller.poll())
  66. self.assertEqual(s1 in socks, 0)
  67. # Wait and then see that s2 is in POLLIN.
  68. time.sleep(0.5)
  69. socks = dict(poller.poll())
  70. self.assertEqual(socks[s2], zmq.POLLIN)
  71. # Make sure that s2 is in POLLOUT after recv.
  72. s2.recv()
  73. socks = dict(poller.poll())
  74. self.assertEqual(socks[s2], zmq.POLLOUT)
  75. poller.unregister(s1)
  76. poller.unregister(s2)
  77. def test_no_events(self):
  78. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  79. poller = self.Poller()
  80. poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
  81. poller.register(s2, 0)
  82. self.assertTrue(s1 in poller)
  83. self.assertFalse(s2 in poller)
  84. poller.register(s1, 0)
  85. self.assertFalse(s1 in poller)
  86. def test_pubsub(self):
  87. s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
  88. s2.setsockopt(zmq.SUBSCRIBE, b'')
  89. # Sleep to allow sockets to connect.
  90. wait()
  91. poller = self.Poller()
  92. poller.register(s1, zmq.POLLIN|zmq.POLLOUT)
  93. poller.register(s2, zmq.POLLIN)
  94. # Now make sure that both are send ready.
  95. socks = dict(poller.poll())
  96. self.assertEqual(socks[s1], zmq.POLLOUT)
  97. self.assertEqual(s2 in socks, 0)
  98. # Make sure that s1 stays in POLLOUT after a send.
  99. s1.send(b'msg1')
  100. socks = dict(poller.poll())
  101. self.assertEqual(socks[s1], zmq.POLLOUT)
  102. # Make sure that s2 is POLLIN after waiting.
  103. wait()
  104. socks = dict(poller.poll())
  105. self.assertEqual(socks[s2], zmq.POLLIN)
  106. # Make sure that s2 goes into 0 after recv.
  107. s2.recv()
  108. socks = dict(poller.poll())
  109. self.assertEqual(s2 in socks, 0)
  110. poller.unregister(s1)
  111. poller.unregister(s2)
  112. @mark.skipif(sys.platform.startswith('win'), reason='Windows')
  113. def test_raw(self):
  114. r, w = os.pipe()
  115. r = os.fdopen(r, 'rb')
  116. w = os.fdopen(w, 'wb')
  117. p = self.Poller()
  118. p.register(r, zmq.POLLIN)
  119. socks = dict(p.poll(1))
  120. assert socks == {}
  121. w.write(b'x')
  122. w.flush()
  123. socks = dict(p.poll(1))
  124. assert socks == {r.fileno(): zmq.POLLIN}
  125. w.close()
  126. r.close()
  127. def test_timeout(self):
  128. """make sure Poller.poll timeout has the right units (milliseconds)."""
  129. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  130. poller = self.Poller()
  131. poller.register(s1, zmq.POLLIN)
  132. tic = time.time()
  133. evt = poller.poll(.005)
  134. toc = time.time()
  135. self.assertTrue(toc-tic < 0.1)
  136. tic = time.time()
  137. evt = poller.poll(5)
  138. toc = time.time()
  139. self.assertTrue(toc-tic < 0.1)
  140. self.assertTrue(toc-tic > .001)
  141. tic = time.time()
  142. evt = poller.poll(500)
  143. toc = time.time()
  144. self.assertTrue(toc-tic < 1)
  145. self.assertTrue(toc-tic > 0.1)
  146. class TestSelect(PollZMQTestCase):
  147. def test_pair(self):
  148. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  149. # Sleep to allow sockets to connect.
  150. wait()
  151. rlist, wlist, xlist = zmq.select([s1, s2], [s1, s2], [s1, s2])
  152. self.assert_(s1 in wlist)
  153. self.assert_(s2 in wlist)
  154. self.assert_(s1 not in rlist)
  155. self.assert_(s2 not in rlist)
  156. def test_timeout(self):
  157. """make sure select timeout has the right units (seconds)."""
  158. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  159. tic = time.time()
  160. r,w,x = zmq.select([s1,s2],[],[],.005)
  161. toc = time.time()
  162. self.assertTrue(toc-tic < 1)
  163. self.assertTrue(toc-tic > 0.001)
  164. tic = time.time()
  165. r,w,x = zmq.select([s1,s2],[],[],.25)
  166. toc = time.time()
  167. self.assertTrue(toc-tic < 1)
  168. self.assertTrue(toc-tic > 0.1)
  169. if have_gevent:
  170. import gevent
  171. from zmq import green as gzmq
  172. class TestPollGreen(GreenTest, TestPoll):
  173. Poller = gzmq.Poller
  174. def test_wakeup(self):
  175. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  176. poller = self.Poller()
  177. poller.register(s2, zmq.POLLIN)
  178. tic = time.time()
  179. r = gevent.spawn(lambda: poller.poll(10000))
  180. s = gevent.spawn(lambda: s1.send(b'msg1'))
  181. r.join()
  182. toc = time.time()
  183. self.assertTrue(toc-tic < 1)
  184. def test_socket_poll(self):
  185. s1, s2 = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  186. tic = time.time()
  187. r = gevent.spawn(lambda: s2.poll(10000))
  188. s = gevent.spawn(lambda: s1.send(b'msg1'))
  189. r.join()
  190. toc = time.time()
  191. self.assertTrue(toc-tic < 1)