test_monitor.py 3.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. # -*- coding: utf-8 -*-
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import errno
  5. import sys
  6. import time
  7. import struct
  8. from unittest import TestCase
  9. from pytest import mark
  10. import zmq
  11. from zmq.tests import BaseZMQTestCase, skip_pypy, require_zmq_4
  12. from zmq.utils.monitor import recv_monitor_message
  13. class TestSocketMonitor(BaseZMQTestCase):
  14. @require_zmq_4
  15. def test_monitor(self):
  16. """Test monitoring interface for sockets."""
  17. s_rep = self.context.socket(zmq.REP)
  18. s_req = self.context.socket(zmq.REQ)
  19. self.sockets.extend([s_rep, s_req])
  20. s_req.bind("tcp://127.0.0.1:6666")
  21. # try monitoring the REP socket
  22. s_rep.monitor("inproc://monitor.rep", zmq.EVENT_CONNECT_DELAYED | zmq.EVENT_CONNECTED | zmq.EVENT_MONITOR_STOPPED)
  23. # create listening socket for monitor
  24. s_event = self.context.socket(zmq.PAIR)
  25. self.sockets.append(s_event)
  26. s_event.connect("inproc://monitor.rep")
  27. s_event.linger = 0
  28. # test receive event for connect event
  29. s_rep.connect("tcp://127.0.0.1:6666")
  30. m = recv_monitor_message(s_event)
  31. if m['event'] == zmq.EVENT_CONNECT_DELAYED:
  32. self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
  33. # test receive event for connected event
  34. m = recv_monitor_message(s_event)
  35. self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
  36. self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6666")
  37. # test monitor can be disabled.
  38. s_rep.disable_monitor()
  39. m = recv_monitor_message(s_event)
  40. self.assertEqual(m['event'], zmq.EVENT_MONITOR_STOPPED)
  41. @require_zmq_4
  42. def test_monitor_repeat(self):
  43. s = self.socket(zmq.PULL)
  44. m = s.get_monitor_socket()
  45. self.sockets.append(m)
  46. m2 = s.get_monitor_socket()
  47. assert m is m2
  48. s.disable_monitor()
  49. evt = recv_monitor_message(m)
  50. self.assertEqual(evt['event'], zmq.EVENT_MONITOR_STOPPED)
  51. m.close()
  52. s.close()
  53. @require_zmq_4
  54. def test_monitor_connected(self):
  55. """Test connected monitoring socket."""
  56. s_rep = self.context.socket(zmq.REP)
  57. s_req = self.context.socket(zmq.REQ)
  58. self.sockets.extend([s_rep, s_req])
  59. s_req.bind("tcp://127.0.0.1:6667")
  60. # try monitoring the REP socket
  61. # create listening socket for monitor
  62. s_event = s_rep.get_monitor_socket()
  63. s_event.linger = 0
  64. self.sockets.append(s_event)
  65. # test receive event for connect event
  66. s_rep.connect("tcp://127.0.0.1:6667")
  67. m = recv_monitor_message(s_event)
  68. if m['event'] == zmq.EVENT_CONNECT_DELAYED:
  69. self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")
  70. # test receive event for connected event
  71. m = recv_monitor_message(s_event)
  72. self.assertEqual(m['event'], zmq.EVENT_CONNECTED)
  73. self.assertEqual(m['endpoint'], b"tcp://127.0.0.1:6667")