test_reqrep.py 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162
  1. # Copyright (C) PyZMQ Developers
  2. # Distributed under the terms of the Modified BSD License.
  3. from unittest import TestCase
  4. import zmq
  5. from zmq.tests import BaseZMQTestCase, have_gevent, GreenTest
  6. class TestReqRep(BaseZMQTestCase):
  7. def test_basic(self):
  8. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  9. msg1 = b'message 1'
  10. msg2 = self.ping_pong(s1, s2, msg1)
  11. self.assertEqual(msg1, msg2)
  12. def test_multiple(self):
  13. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  14. for i in range(10):
  15. msg1 = i*b' '
  16. msg2 = self.ping_pong(s1, s2, msg1)
  17. self.assertEqual(msg1, msg2)
  18. def test_bad_send_recv(self):
  19. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  20. if zmq.zmq_version() != '2.1.8':
  21. # this doesn't work on 2.1.8
  22. for copy in (True,False):
  23. self.assertRaisesErrno(zmq.EFSM, s1.recv, copy=copy)
  24. self.assertRaisesErrno(zmq.EFSM, s2.send, b'asdf', copy=copy)
  25. # I have to have this or we die on an Abort trap.
  26. msg1 = b'asdf'
  27. msg2 = self.ping_pong(s1, s2, msg1)
  28. self.assertEqual(msg1, msg2)
  29. def test_json(self):
  30. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  31. o = dict(a=10,b=list(range(10)))
  32. o2 = self.ping_pong_json(s1, s2, o)
  33. def test_pyobj(self):
  34. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  35. o = dict(a=10,b=range(10))
  36. o2 = self.ping_pong_pyobj(s1, s2, o)
  37. def test_large_msg(self):
  38. s1, s2 = self.create_bound_pair(zmq.REQ, zmq.REP)
  39. msg1 = 10000*b'X'
  40. for i in range(10):
  41. msg2 = self.ping_pong(s1, s2, msg1)
  42. self.assertEqual(msg1, msg2)
  43. if have_gevent:
  44. class TestReqRepGreen(GreenTest, TestReqRep):
  45. pass