socket.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774
  1. # coding: utf-8
  2. """0MQ Socket pure Python methods."""
  3. # Copyright (C) PyZMQ Developers
  4. # Distributed under the terms of the Modified BSD License.
  5. import errno
  6. import random
  7. import sys
  8. import warnings
  9. import zmq
  10. from zmq.backend import Socket as SocketBase
  11. from .poll import Poller
  12. from . import constants
  13. from .attrsettr import AttributeSetter
  14. from zmq.error import ZMQError, ZMQBindError
  15. from zmq.utils import jsonapi
  16. from zmq.utils.strtypes import bytes, unicode, basestring
  17. from .constants import (
  18. SNDMORE, ENOTSUP, POLLIN,
  19. int64_sockopt_names,
  20. int_sockopt_names,
  21. bytes_sockopt_names,
  22. fd_sockopt_names,
  23. )
  24. try:
  25. import cPickle
  26. pickle = cPickle
  27. except:
  28. cPickle = None
  29. import pickle
  30. try:
  31. DEFAULT_PROTOCOL = pickle.DEFAULT_PROTOCOL
  32. except AttributeError:
  33. DEFAULT_PROTOCOL = pickle.HIGHEST_PROTOCOL
  34. class Socket(SocketBase, AttributeSetter):
  35. """The ZMQ socket object
  36. To create a Socket, first create a Context::
  37. ctx = zmq.Context.instance()
  38. then call ``ctx.socket(socket_type)``::
  39. s = ctx.socket(zmq.ROUTER)
  40. """
  41. _shadow = False
  42. _monitor_socket = None
  43. def __init__(self, *a, **kw):
  44. super(Socket, self).__init__(*a, **kw)
  45. if 'shadow' in kw:
  46. self._shadow = True
  47. else:
  48. self._shadow = False
  49. def __del__(self):
  50. if not self._shadow:
  51. self.close()
  52. # socket as context manager:
  53. def __enter__(self):
  54. """Sockets are context managers
  55. .. versionadded:: 14.4
  56. """
  57. return self
  58. def __exit__(self, *args, **kwargs):
  59. self.close()
  60. #-------------------------------------------------------------------------
  61. # Socket creation
  62. #-------------------------------------------------------------------------
  63. def __copy__(self, memo=None):
  64. """Copying a Socket creates a shadow copy"""
  65. return self.__class__.shadow(self.underlying)
  66. __deepcopy__ = __copy__
  67. @classmethod
  68. def shadow(cls, address):
  69. """Shadow an existing libzmq socket
  70. address is the integer address of the libzmq socket
  71. or an FFI pointer to it.
  72. .. versionadded:: 14.1
  73. """
  74. from zmq.utils.interop import cast_int_addr
  75. address = cast_int_addr(address)
  76. return cls(shadow=address)
  77. def close(self, linger=None):
  78. """
  79. Close the socket.
  80. If linger is specified, LINGER sockopt will be set prior to closing.
  81. Note: closing a zmq Socket may not close the underlying sockets
  82. if there are undelivered messages.
  83. Only after all messages are delivered or discarded by reaching the socket's LINGER timeout
  84. (default: forever)
  85. will the underlying sockets be closed.
  86. This can be called to close the socket by hand. If this is not
  87. called, the socket will automatically be closed when it is
  88. garbage collected.
  89. """
  90. if self.context:
  91. self.context._rm_socket(self)
  92. super(Socket, self).close(linger=linger)
  93. #-------------------------------------------------------------------------
  94. # Deprecated aliases
  95. #-------------------------------------------------------------------------
  96. @property
  97. def socket_type(self):
  98. warnings.warn("Socket.socket_type is deprecated, use Socket.type",
  99. DeprecationWarning
  100. )
  101. return self.type
  102. #-------------------------------------------------------------------------
  103. # Hooks for sockopt completion
  104. #-------------------------------------------------------------------------
  105. def __dir__(self):
  106. keys = dir(self.__class__)
  107. for collection in (
  108. bytes_sockopt_names,
  109. int_sockopt_names,
  110. int64_sockopt_names,
  111. fd_sockopt_names,
  112. ):
  113. keys.extend(collection)
  114. return keys
  115. #-------------------------------------------------------------------------
  116. # Getting/Setting options
  117. #-------------------------------------------------------------------------
  118. setsockopt = SocketBase.set
  119. getsockopt = SocketBase.get
  120. def __setattr__(self, key, value):
  121. """Override to allow setting zmq.[UN]SUBSCRIBE even though we have a subscribe method"""
  122. if key in self.__dict__:
  123. object.__setattr__(self, key, value)
  124. return
  125. _key = key.lower()
  126. if _key in ('subscribe', 'unsubscribe'):
  127. if isinstance(value, unicode):
  128. value = value.encode('utf8')
  129. if _key == 'subscribe':
  130. self.set(zmq.SUBSCRIBE, value)
  131. else:
  132. self.set(zmq.UNSUBSCRIBE, value)
  133. return
  134. super(Socket, self).__setattr__(key, value)
  135. def fileno(self):
  136. """Return edge-triggered file descriptor for this socket.
  137. This is a read-only edge-triggered file descriptor for both read and write events on this socket.
  138. It is important that all available events be consumed when an event is detected,
  139. otherwise the read event will not trigger again.
  140. .. versionadded:: 17.0
  141. """
  142. return self.FD
  143. def subscribe(self, topic):
  144. """Subscribe to a topic
  145. Only for SUB sockets.
  146. .. versionadded:: 15.3
  147. """
  148. if isinstance(topic, unicode):
  149. topic = topic.encode('utf8')
  150. self.set(zmq.SUBSCRIBE, topic)
  151. def unsubscribe(self, topic):
  152. """Unsubscribe from a topic
  153. Only for SUB sockets.
  154. .. versionadded:: 15.3
  155. """
  156. if isinstance(topic, unicode):
  157. topic = topic.encode('utf8')
  158. self.set(zmq.UNSUBSCRIBE, topic)
  159. def set_string(self, option, optval, encoding='utf-8'):
  160. """Set socket options with a unicode object.
  161. This is simply a wrapper for setsockopt to protect from encoding ambiguity.
  162. See the 0MQ documentation for details on specific options.
  163. Parameters
  164. ----------
  165. option : int
  166. The name of the option to set. Can be any of: SUBSCRIBE,
  167. UNSUBSCRIBE, IDENTITY
  168. optval : unicode string (unicode on py2, str on py3)
  169. The value of the option to set.
  170. encoding : str
  171. The encoding to be used, default is utf8
  172. """
  173. if not isinstance(optval, unicode):
  174. raise TypeError("unicode strings only")
  175. return self.set(option, optval.encode(encoding))
  176. setsockopt_unicode = setsockopt_string = set_string
  177. def get_string(self, option, encoding='utf-8'):
  178. """Get the value of a socket option.
  179. See the 0MQ documentation for details on specific options.
  180. Parameters
  181. ----------
  182. option : int
  183. The option to retrieve.
  184. Returns
  185. -------
  186. optval : unicode string (unicode on py2, str on py3)
  187. The value of the option as a unicode string.
  188. """
  189. if option not in constants.bytes_sockopts:
  190. raise TypeError("option %i will not return a string to be decoded"%option)
  191. return self.getsockopt(option).decode(encoding)
  192. getsockopt_unicode = getsockopt_string = get_string
  193. def bind_to_random_port(self, addr, min_port=49152, max_port=65536, max_tries=100):
  194. """Bind this socket to a random port in a range.
  195. If the port range is unspecified, the system will choose the port.
  196. Parameters
  197. ----------
  198. addr : str
  199. The address string without the port to pass to ``Socket.bind()``.
  200. min_port : int, optional
  201. The minimum port in the range of ports to try (inclusive).
  202. max_port : int, optional
  203. The maximum port in the range of ports to try (exclusive).
  204. max_tries : int, optional
  205. The maximum number of bind attempts to make.
  206. Returns
  207. -------
  208. port : int
  209. The port the socket was bound to.
  210. Raises
  211. ------
  212. ZMQBindError
  213. if `max_tries` reached before successful bind
  214. """
  215. if hasattr(constants, 'LAST_ENDPOINT') and min_port == 49152 and max_port == 65536:
  216. # if LAST_ENDPOINT is supported, and min_port / max_port weren't specified,
  217. # we can bind to port 0 and let the OS do the work
  218. self.bind("%s:*" % addr)
  219. url = self.last_endpoint.decode('ascii', 'replace')
  220. _, port_s = url.rsplit(':', 1)
  221. return int(port_s)
  222. for i in range(max_tries):
  223. try:
  224. port = random.randrange(min_port, max_port)
  225. self.bind('%s:%s' % (addr, port))
  226. except ZMQError as exception:
  227. en = exception.errno
  228. if en == zmq.EADDRINUSE:
  229. continue
  230. elif sys.platform == 'win32' and en == errno.EACCES:
  231. continue
  232. else:
  233. raise
  234. else:
  235. return port
  236. raise ZMQBindError("Could not bind socket to random port.")
  237. def get_hwm(self):
  238. """Get the High Water Mark.
  239. On libzmq ≥ 3, this gets SNDHWM if available, otherwise RCVHWM
  240. """
  241. major = zmq.zmq_version_info()[0]
  242. if major >= 3:
  243. # return sndhwm, fallback on rcvhwm
  244. try:
  245. return self.getsockopt(zmq.SNDHWM)
  246. except zmq.ZMQError:
  247. pass
  248. return self.getsockopt(zmq.RCVHWM)
  249. else:
  250. return self.getsockopt(zmq.HWM)
  251. def set_hwm(self, value):
  252. """Set the High Water Mark.
  253. On libzmq ≥ 3, this sets both SNDHWM and RCVHWM
  254. .. warning::
  255. New values only take effect for subsequent socket
  256. bind/connects.
  257. """
  258. major = zmq.zmq_version_info()[0]
  259. if major >= 3:
  260. raised = None
  261. try:
  262. self.sndhwm = value
  263. except Exception as e:
  264. raised = e
  265. try:
  266. self.rcvhwm = value
  267. except Exception as e:
  268. raised = e
  269. if raised:
  270. raise raised
  271. else:
  272. return self.setsockopt(zmq.HWM, value)
  273. hwm = property(get_hwm, set_hwm,
  274. """Property for High Water Mark.
  275. Setting hwm sets both SNDHWM and RCVHWM as appropriate.
  276. It gets SNDHWM if available, otherwise RCVHWM.
  277. """
  278. )
  279. #-------------------------------------------------------------------------
  280. # Sending and receiving messages
  281. #-------------------------------------------------------------------------
  282. def send(self, data, flags=0, copy=True, track=False, routing_id=None, group=None):
  283. """Send a single zmq message frame on this socket.
  284. This queues the message to be sent by the IO thread at a later time.
  285. With flags=NOBLOCK, this raises :class:`ZMQError` if the queue is full;
  286. otherwise, this waits until space is available.
  287. See :class:`Poller` for more general non-blocking I/O.
  288. Parameters
  289. ----------
  290. data : bytes, Frame, memoryview
  291. The content of the message. This can be any object that provides
  292. the Python buffer API (i.e. `memoryview(data)` can be called).
  293. flags : int
  294. 0, NOBLOCK, SNDMORE, or NOBLOCK|SNDMORE.
  295. copy : bool
  296. Should the message be sent in a copying or non-copying manner.
  297. track : bool
  298. Should the message be tracked for notification that ZMQ has
  299. finished with it? (ignored if copy=True)
  300. routing_id : int
  301. For use with SERVER sockets
  302. group : str
  303. For use with RADIO sockets
  304. Returns
  305. -------
  306. None : if `copy` or not track
  307. None if message was sent, raises an exception otherwise.
  308. MessageTracker : if track and not copy
  309. a MessageTracker object, whose `pending` property will
  310. be True until the send is completed.
  311. Raises
  312. ------
  313. TypeError
  314. If a unicode object is passed
  315. ValueError
  316. If `track=True`, but an untracked Frame is passed.
  317. ZMQError
  318. If the send does not succeed for any reason (including
  319. if NOBLOCK is set and the outgoing queue is full).
  320. .. versionchanged:: 17.0
  321. DRAFT support for routing_id and group arguments.
  322. """
  323. if routing_id is not None:
  324. if not isinstance(data, zmq.Frame):
  325. data = zmq.Frame(data, track=track, copy=copy or None,
  326. copy_threshold=self.copy_threshold)
  327. data.routing_id = routing_id
  328. if group is not None:
  329. if not isinstance(data, zmq.Frame):
  330. data = zmq.Frame(data, track=track, copy=copy or None,
  331. copy_threshold=self.copy_threshold)
  332. data.group = group
  333. return super(Socket, self).send(data, flags=flags, copy=copy, track=track)
  334. def send_multipart(self, msg_parts, flags=0, copy=True, track=False, **kwargs):
  335. """Send a sequence of buffers as a multipart message.
  336. The zmq.SNDMORE flag is added to all msg parts before the last.
  337. Parameters
  338. ----------
  339. msg_parts : iterable
  340. A sequence of objects to send as a multipart message. Each element
  341. can be any sendable object (Frame, bytes, buffer-providers)
  342. flags : int, optional
  343. Any valid flags for :func:`Socket.send`.
  344. SNDMORE is added automatically for frames before the last.
  345. copy : bool, optional
  346. Should the frame(s) be sent in a copying or non-copying manner.
  347. If copy=False, frames smaller than self.copy_threshold bytes
  348. will be copied anyway.
  349. track : bool, optional
  350. Should the frame(s) be tracked for notification that ZMQ has
  351. finished with it (ignored if copy=True).
  352. Returns
  353. -------
  354. None : if copy or not track
  355. MessageTracker : if track and not copy
  356. a MessageTracker object, whose `pending` property will
  357. be True until the last send is completed.
  358. """
  359. # typecheck parts before sending:
  360. for i,msg in enumerate(msg_parts):
  361. if isinstance(msg, (zmq.Frame, bytes, memoryview)):
  362. continue
  363. try:
  364. memoryview(msg)
  365. except Exception:
  366. rmsg = repr(msg)
  367. if len(rmsg) > 32:
  368. rmsg = rmsg[:32] + '...'
  369. raise TypeError(
  370. "Frame %i (%s) does not support the buffer interface." % (
  371. i, rmsg,
  372. ))
  373. for msg in msg_parts[:-1]:
  374. self.send(msg, SNDMORE|flags, copy=copy, track=track)
  375. # Send the last part without the extra SNDMORE flag.
  376. return self.send(msg_parts[-1], flags, copy=copy, track=track)
  377. def recv_multipart(self, flags=0, copy=True, track=False):
  378. """Receive a multipart message as a list of bytes or Frame objects
  379. Parameters
  380. ----------
  381. flags : int, optional
  382. Any valid flags for :func:`Socket.recv`.
  383. copy : bool, optional
  384. Should the message frame(s) be received in a copying or non-copying manner?
  385. If False a Frame object is returned for each part, if True a copy of
  386. the bytes is made for each frame.
  387. track : bool, optional
  388. Should the message frame(s) be tracked for notification that ZMQ has
  389. finished with it? (ignored if copy=True)
  390. Returns
  391. -------
  392. msg_parts : list
  393. A list of frames in the multipart message; either Frames or bytes,
  394. depending on `copy`.
  395. Raises
  396. ------
  397. ZMQError
  398. for any of the reasons :func:`~Socket.recv` might fail
  399. """
  400. parts = [self.recv(flags, copy=copy, track=track)]
  401. # have first part already, only loop while more to receive
  402. while self.getsockopt(zmq.RCVMORE):
  403. part = self.recv(flags, copy=copy, track=track)
  404. parts.append(part)
  405. return parts
  406. def _deserialize(self, recvd, load):
  407. """Deserialize a received message
  408. Override in subclass (e.g. Futures) if recvd is not the raw bytes.
  409. The default implementation expects bytes and returns the deserialized message immediately.
  410. Parameters
  411. ----------
  412. load: callable
  413. Callable that deserializes bytes
  414. recvd:
  415. The object returned by self.recv
  416. """
  417. return load(recvd)
  418. def send_serialized(self, msg, serialize, flags=0, copy=True, **kwargs):
  419. """Send a message with a custom serialization function.
  420. .. versionadded:: 17
  421. Parameters
  422. ----------
  423. msg : The message to be sent. Can be any object serializable by `serialize`.
  424. serialize : callable
  425. The serialization function to use.
  426. serialize(msg) should return an iterable of sendable message frames
  427. (e.g. bytes objects), which will be passed to send_multipart.
  428. flags : int, optional
  429. Any valid flags for :func:`Socket.send`.
  430. copy : bool, optional
  431. Whether to copy the frames.
  432. """
  433. frames = serialize(msg)
  434. return self.send_multipart(frames, flags=flags, copy=copy, **kwargs)
  435. def recv_serialized(self, deserialize, flags=0, copy=True):
  436. """Receive a message with a custom deserialization function.
  437. .. versionadded:: 17
  438. Parameters
  439. ----------
  440. deserialize : callable
  441. The deserialization function to use.
  442. deserialize will be called with one argument: the list of frames
  443. returned by recv_multipart() and can return any object.
  444. flags : int, optional
  445. Any valid flags for :func:`Socket.recv`.
  446. copy : bool, optional
  447. Whether to recv bytes or Frame objects.
  448. Returns
  449. -------
  450. obj : object
  451. The object returned by the deserialization function.
  452. Raises
  453. ------
  454. ZMQError
  455. for any of the reasons :func:`~Socket.recv` might fail
  456. """
  457. frames = self.recv_multipart(flags=flags, copy=copy)
  458. return self._deserialize(frames, deserialize)
  459. def send_string(self, u, flags=0, copy=True, encoding='utf-8', **kwargs):
  460. """Send a Python unicode string as a message with an encoding.
  461. 0MQ communicates with raw bytes, so you must encode/decode
  462. text (unicode on py2, str on py3) around 0MQ.
  463. Parameters
  464. ----------
  465. u : Python unicode string (unicode on py2, str on py3)
  466. The unicode string to send.
  467. flags : int, optional
  468. Any valid flags for :func:`Socket.send`.
  469. encoding : str [default: 'utf-8']
  470. The encoding to be used
  471. """
  472. if not isinstance(u, basestring):
  473. raise TypeError("unicode/str objects only")
  474. return self.send(u.encode(encoding), flags=flags, copy=copy, **kwargs)
  475. send_unicode = send_string
  476. def recv_string(self, flags=0, encoding='utf-8'):
  477. """Receive a unicode string, as sent by send_string.
  478. Parameters
  479. ----------
  480. flags : int
  481. Any valid flags for :func:`Socket.recv`.
  482. encoding : str [default: 'utf-8']
  483. The encoding to be used
  484. Returns
  485. -------
  486. s : unicode string (unicode on py2, str on py3)
  487. The Python unicode string that arrives as encoded bytes.
  488. Raises
  489. ------
  490. ZMQError
  491. for any of the reasons :func:`~Socket.recv` might fail
  492. """
  493. msg = self.recv(flags=flags)
  494. return self._deserialize(msg, lambda buf: buf.decode(encoding))
  495. recv_unicode = recv_string
  496. def send_pyobj(self, obj, flags=0, protocol=DEFAULT_PROTOCOL, **kwargs):
  497. """Send a Python object as a message using pickle to serialize.
  498. Parameters
  499. ----------
  500. obj : Python object
  501. The Python object to send.
  502. flags : int
  503. Any valid flags for :func:`Socket.send`.
  504. protocol : int
  505. The pickle protocol number to use. The default is pickle.DEFAULT_PROTOCOL
  506. where defined, and pickle.HIGHEST_PROTOCOL elsewhere.
  507. """
  508. msg = pickle.dumps(obj, protocol)
  509. return self.send(msg, flags=flags, **kwargs)
  510. def recv_pyobj(self, flags=0):
  511. """Receive a Python object as a message using pickle to serialize.
  512. Parameters
  513. ----------
  514. flags : int
  515. Any valid flags for :func:`Socket.recv`.
  516. Returns
  517. -------
  518. obj : Python object
  519. The Python object that arrives as a message.
  520. Raises
  521. ------
  522. ZMQError
  523. for any of the reasons :func:`~Socket.recv` might fail
  524. """
  525. msg = self.recv(flags)
  526. return self._deserialize(msg, pickle.loads)
  527. def send_json(self, obj, flags=0, **kwargs):
  528. """Send a Python object as a message using json to serialize.
  529. Keyword arguments are passed on to json.dumps
  530. Parameters
  531. ----------
  532. obj : Python object
  533. The Python object to send
  534. flags : int
  535. Any valid flags for :func:`Socket.send`
  536. """
  537. send_kwargs = {}
  538. for key in ('routing_id', 'group'):
  539. if key in kwargs:
  540. send_kwargs[key] = kwargs.pop(key)
  541. msg = jsonapi.dumps(obj, **kwargs)
  542. return self.send(msg, flags=flags, **send_kwargs)
  543. def recv_json(self, flags=0, **kwargs):
  544. """Receive a Python object as a message using json to serialize.
  545. Keyword arguments are passed on to json.loads
  546. Parameters
  547. ----------
  548. flags : int
  549. Any valid flags for :func:`Socket.recv`.
  550. Returns
  551. -------
  552. obj : Python object
  553. The Python object that arrives as a message.
  554. Raises
  555. ------
  556. ZMQError
  557. for any of the reasons :func:`~Socket.recv` might fail
  558. """
  559. msg = self.recv(flags)
  560. return self._deserialize(msg, lambda buf: jsonapi.loads(buf, **kwargs))
  561. _poller_class = Poller
  562. def poll(self, timeout=None, flags=POLLIN):
  563. """Poll the socket for events.
  564. See :class:`Poller` to wait for multiple sockets at once.
  565. Parameters
  566. ----------
  567. timeout : int [default: None]
  568. The timeout (in milliseconds) to wait for an event. If unspecified
  569. (or specified None), will wait forever for an event.
  570. flags : int [default: POLLIN]
  571. POLLIN, POLLOUT, or POLLIN|POLLOUT. The event flags to poll for.
  572. Returns
  573. -------
  574. event_mask : int
  575. The poll event mask (POLLIN, POLLOUT),
  576. 0 if the timeout was reached without an event.
  577. """
  578. if self.closed:
  579. raise ZMQError(ENOTSUP)
  580. p = self._poller_class()
  581. p.register(self, flags)
  582. evts = dict(p.poll(timeout))
  583. # return 0 if no events, otherwise return event bitfield
  584. return evts.get(self, 0)
  585. def get_monitor_socket(self, events=None, addr=None):
  586. """Return a connected PAIR socket ready to receive the event notifications.
  587. .. versionadded:: libzmq-4.0
  588. .. versionadded:: 14.0
  589. Parameters
  590. ----------
  591. events : int [default: ZMQ_EVENT_ALL]
  592. The bitmask defining which events are wanted.
  593. addr : string [default: None]
  594. The optional endpoint for the monitoring sockets.
  595. Returns
  596. -------
  597. socket : (PAIR)
  598. The socket is already connected and ready to receive messages.
  599. """
  600. # safe-guard, method only available on libzmq >= 4
  601. if zmq.zmq_version_info() < (4,):
  602. raise NotImplementedError("get_monitor_socket requires libzmq >= 4, have %s" % zmq.zmq_version())
  603. # if already monitoring, return existing socket
  604. if self._monitor_socket:
  605. if self._monitor_socket.closed:
  606. self._monitor_socket = None
  607. else:
  608. return self._monitor_socket
  609. if addr is None:
  610. # create endpoint name from internal fd
  611. addr = "inproc://monitor.s-%d" % self.FD
  612. if events is None:
  613. # use all events
  614. events = zmq.EVENT_ALL
  615. # attach monitoring socket
  616. self.monitor(addr, events)
  617. # create new PAIR socket and connect it
  618. self._monitor_socket = self.context.socket(zmq.PAIR)
  619. self._monitor_socket.connect(addr)
  620. return self._monitor_socket
  621. def disable_monitor(self):
  622. """Shutdown the PAIR socket (created using get_monitor_socket)
  623. that is serving socket events.
  624. .. versionadded:: 14.4
  625. """
  626. self._monitor_socket = None
  627. self.monitor(None, 0)
  628. __all__ = ['Socket']