test_draft.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152
  1. # -*- coding: utf8 -*-
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import os
  5. import platform
  6. import time
  7. import pytest
  8. import zmq
  9. from zmq.tests import (
  10. BaseZMQTestCase, skip_pypy
  11. )
  12. class TestDraftSockets(BaseZMQTestCase):
  13. def setUp(self):
  14. if not zmq.DRAFT_API:
  15. raise pytest.skip("draft api unavailable")
  16. super(TestDraftSockets, self).setUp()
  17. def test_client_server(self):
  18. client, server = self.create_bound_pair(zmq.CLIENT, zmq.SERVER)
  19. client.send(b'request')
  20. msg = self.recv(server, copy=False)
  21. assert msg.routing_id is not None
  22. server.send(b'reply', routing_id=msg.routing_id)
  23. reply = self.recv(client)
  24. assert reply == b'reply'
  25. def test_radio_dish(self):
  26. dish, radio = self.create_bound_pair(zmq.DISH, zmq.RADIO)
  27. dish.rcvtimeo = 250
  28. group = 'mygroup'
  29. dish.join(group)
  30. received_count = 0
  31. received = set()
  32. sent = set()
  33. for i in range(10):
  34. msg = str(i).encode('ascii')
  35. sent.add(msg)
  36. radio.send(msg, group=group)
  37. try:
  38. recvd = dish.recv()
  39. except zmq.Again:
  40. time.sleep(0.1)
  41. else:
  42. received.add(recvd)
  43. received_count += 1
  44. # assert that we got *something*
  45. assert len(received.intersection(sent)) >= 5