monitoredqueue.py 1.0 KB

12345678910111213141516171819202122232425262728293031323334353637
  1. """pure Python monitored_queue function
  2. For use when Cython extension is unavailable (PyPy).
  3. Authors
  4. -------
  5. * MinRK
  6. """
  7. # Copyright (C) PyZMQ Developers
  8. # Distributed under the terms of the Modified BSD License.
  9. import zmq
  10. def _relay(ins, outs, sides, prefix, swap_ids):
  11. msg = ins.recv_multipart()
  12. if swap_ids:
  13. msg[:2] = msg[:2][::-1]
  14. outs.send_multipart(msg)
  15. sides.send_multipart([prefix] + msg)
  16. def monitored_queue(in_socket, out_socket, mon_socket,
  17. in_prefix=b'in', out_prefix=b'out'):
  18. swap_ids = in_socket.type == zmq.ROUTER and out_socket.type == zmq.ROUTER
  19. poller = zmq.Poller()
  20. poller.register(in_socket, zmq.POLLIN)
  21. poller.register(out_socket, zmq.POLLIN)
  22. while True:
  23. events = dict(poller.poll())
  24. if in_socket in events:
  25. _relay(in_socket, out_socket, mon_socket, in_prefix, swap_ids)
  26. if out_socket in events:
  27. _relay(out_socket, in_socket, mon_socket, out_prefix, swap_ids)
  28. __all__ = ['monitored_queue']