12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152 |
- # -*- coding: utf8 -*-
- # Copyright (C) PyZMQ Developers
- # Distributed under the terms of the Modified BSD License.
- import os
- import platform
- import time
- import pytest
- import zmq
- from zmq.tests import (
- BaseZMQTestCase, skip_pypy
- )
- class TestDraftSockets(BaseZMQTestCase):
- def setUp(self):
- if not zmq.DRAFT_API:
- raise pytest.skip("draft api unavailable")
- super(TestDraftSockets, self).setUp()
-
- def test_client_server(self):
- client, server = self.create_bound_pair(zmq.CLIENT, zmq.SERVER)
- client.send(b'request')
- msg = self.recv(server, copy=False)
- assert msg.routing_id is not None
- server.send(b'reply', routing_id=msg.routing_id)
- reply = self.recv(client)
- assert reply == b'reply'
- def test_radio_dish(self):
- dish, radio = self.create_bound_pair(zmq.DISH, zmq.RADIO)
- dish.rcvtimeo = 250
- group = 'mygroup'
- dish.join(group)
- received_count = 0
- received = set()
- sent = set()
- for i in range(10):
- msg = str(i).encode('ascii')
- sent.add(msg)
- radio.send(msg, group=group)
- try:
- recvd = dish.recv()
- except zmq.Again:
- time.sleep(0.1)
- else:
- received.add(recvd)
- received_count += 1
- # assert that we got *something*
- assert len(received.intersection(sent)) >= 5
|