test_pubsub.py 1.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. from random import Random
  4. import time
  5. from unittest import TestCase
  6. import zmq
  7. from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
  8. class TestPubSub(BaseZMQTestCase):
  9. pass
  10. # We are disabling this test while an issue is being resolved.
  11. def test_basic(self):
  12. s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
  13. s2.setsockopt(zmq.SUBSCRIBE, b'')
  14. time.sleep(0.1)
  15. msg1 = b'message'
  16. s1.send(msg1)
  17. msg2 = s2.recv() # This is blocking!
  18. self.assertEqual(msg1, msg2)
  19. def test_topic(self):
  20. s1, s2 = self.create_bound_pair(zmq.PUB, zmq.SUB)
  21. s2.setsockopt(zmq.SUBSCRIBE, b'x')
  22. time.sleep(0.1)
  23. msg1 = b'message'
  24. s1.send(msg1)
  25. self.assertRaisesErrno(zmq.EAGAIN, s2.recv, zmq.NOBLOCK)
  26. msg1 = b'xmessage'
  27. s1.send(msg1)
  28. msg2 = s2.recv()
  29. self.assertEqual(msg1, msg2)
  30. if have_gevent:
  31. class TestPubSubGreen(GreenTest, TestPubSub):
  32. pass