socket.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. # coding: utf-8
  2. """zmq Socket class"""
  3. # Copyright (C) PyZMQ Developers
  4. # Distributed under the terms of the Modified BSD License.
  5. import errno as errno_mod
  6. from ._cffi import (C, ffi, new_uint64_pointer, new_int64_pointer,
  7. new_int_pointer, new_binary_data, value_uint64_pointer,
  8. value_int64_pointer, value_int_pointer, value_binary_data,
  9. IPC_PATH_MAX_LEN)
  10. from .message import Frame
  11. from .constants import RCVMORE
  12. from .utils import _retry_sys_call
  13. import zmq
  14. from zmq.error import ZMQError, _check_rc, _check_version
  15. from zmq.utils.strtypes import unicode
  16. def new_pointer_from_opt(option, length=0):
  17. from zmq.sugar.constants import (
  18. int64_sockopts, bytes_sockopts,
  19. )
  20. if option in int64_sockopts:
  21. return new_int64_pointer()
  22. elif option in bytes_sockopts:
  23. return new_binary_data(length)
  24. else:
  25. # default
  26. return new_int_pointer()
  27. def value_from_opt_pointer(option, opt_pointer, length=0):
  28. from zmq.sugar.constants import (
  29. int64_sockopts, bytes_sockopts,
  30. )
  31. if option in int64_sockopts:
  32. return int(opt_pointer[0])
  33. elif option in bytes_sockopts:
  34. return ffi.buffer(opt_pointer, length)[:]
  35. else:
  36. return int(opt_pointer[0])
  37. def initialize_opt_pointer(option, value, length=0):
  38. from zmq.sugar.constants import (
  39. int64_sockopts, bytes_sockopts,
  40. )
  41. if option in int64_sockopts:
  42. return value_int64_pointer(value)
  43. elif option in bytes_sockopts:
  44. return value_binary_data(value, length)
  45. else:
  46. return value_int_pointer(value)
  47. class Socket(object):
  48. context = None
  49. socket_type = None
  50. _zmq_socket = None
  51. _closed = None
  52. _ref = None
  53. _shadow = False
  54. copy_threshold = 0
  55. def __init__(self, context=None, socket_type=None, shadow=None):
  56. self.context = context
  57. if shadow is not None:
  58. if isinstance(shadow, Socket):
  59. shadow = shadow.underlying
  60. self._zmq_socket = ffi.cast("void *", shadow)
  61. self._shadow = True
  62. else:
  63. self._shadow = False
  64. self._zmq_socket = C.zmq_socket(context._zmq_ctx, socket_type)
  65. if self._zmq_socket == ffi.NULL:
  66. raise ZMQError()
  67. self._closed = False
  68. @property
  69. def underlying(self):
  70. """The address of the underlying libzmq socket"""
  71. return int(ffi.cast('size_t', self._zmq_socket))
  72. def _check_closed_deep(self):
  73. """thorough check of whether the socket has been closed,
  74. even if by another entity (e.g. ctx.destroy).
  75. Only used by the `closed` property.
  76. returns True if closed, False otherwise
  77. """
  78. if self._closed:
  79. return True
  80. try:
  81. self.get(zmq.TYPE)
  82. except ZMQError as e:
  83. if e.errno == zmq.ENOTSOCK:
  84. self._closed = True
  85. return True
  86. else:
  87. raise
  88. return False
  89. @property
  90. def closed(self):
  91. return self._check_closed_deep()
  92. def close(self, linger=None):
  93. rc = 0
  94. if not self._closed and hasattr(self, '_zmq_socket'):
  95. if self._zmq_socket is not None:
  96. if linger is not None:
  97. self.set(zmq.LINGER, linger)
  98. rc = C.zmq_close(self._zmq_socket)
  99. self._closed = True
  100. if rc < 0:
  101. _check_rc(rc)
  102. def bind(self, address):
  103. if isinstance(address, unicode):
  104. address = address.encode('utf8')
  105. rc = C.zmq_bind(self._zmq_socket, address)
  106. if rc < 0:
  107. if IPC_PATH_MAX_LEN and C.zmq_errno() == errno_mod.ENAMETOOLONG:
  108. # py3compat: address is bytes, but msg wants str
  109. if str is unicode:
  110. address = address.decode('utf-8', 'replace')
  111. path = address.split('://', 1)[-1]
  112. msg = ('ipc path "{0}" is longer than {1} '
  113. 'characters (sizeof(sockaddr_un.sun_path)).'
  114. .format(path, IPC_PATH_MAX_LEN))
  115. raise ZMQError(C.zmq_errno(), msg=msg)
  116. elif C.zmq_errno() == errno_mod.ENOENT:
  117. # py3compat: address is bytes, but msg wants str
  118. if str is unicode:
  119. address = address.decode('utf-8', 'replace')
  120. path = address.split('://', 1)[-1]
  121. msg = ('No such file or directory for ipc path "{0}".'.format(
  122. path))
  123. raise ZMQError(C.zmq_errno(), msg=msg)
  124. else:
  125. _check_rc(rc)
  126. def unbind(self, address):
  127. _check_version((3,2), "unbind")
  128. if isinstance(address, unicode):
  129. address = address.encode('utf8')
  130. rc = C.zmq_unbind(self._zmq_socket, address)
  131. _check_rc(rc)
  132. def connect(self, address):
  133. if isinstance(address, unicode):
  134. address = address.encode('utf8')
  135. rc = C.zmq_connect(self._zmq_socket, address)
  136. _check_rc(rc)
  137. def disconnect(self, address):
  138. _check_version((3,2), "disconnect")
  139. if isinstance(address, unicode):
  140. address = address.encode('utf8')
  141. rc = C.zmq_disconnect(self._zmq_socket, address)
  142. _check_rc(rc)
  143. def set(self, option, value):
  144. length = None
  145. if isinstance(value, unicode):
  146. raise TypeError("unicode not allowed, use bytes")
  147. if isinstance(value, bytes):
  148. if option not in zmq.constants.bytes_sockopts:
  149. raise TypeError("not a bytes sockopt: %s" % option)
  150. length = len(value)
  151. c_data = initialize_opt_pointer(option, value, length)
  152. c_value_pointer = c_data[0]
  153. c_sizet = c_data[1]
  154. _retry_sys_call(C.zmq_setsockopt,
  155. self._zmq_socket,
  156. option,
  157. ffi.cast('void*', c_value_pointer),
  158. c_sizet)
  159. def get(self, option):
  160. c_data = new_pointer_from_opt(option, length=255)
  161. c_value_pointer = c_data[0]
  162. c_sizet_pointer = c_data[1]
  163. _retry_sys_call(C.zmq_getsockopt,
  164. self._zmq_socket,
  165. option,
  166. c_value_pointer,
  167. c_sizet_pointer)
  168. sz = c_sizet_pointer[0]
  169. v = value_from_opt_pointer(option, c_value_pointer, sz)
  170. if option != zmq.IDENTITY and option in zmq.constants.bytes_sockopts and v.endswith(b'\0'):
  171. v = v[:-1]
  172. return v
  173. def send(self, message, flags=0, copy=False, track=False):
  174. if isinstance(message, unicode):
  175. raise TypeError("Message must be in bytes, not an unicode Object")
  176. if isinstance(message, Frame):
  177. message = message.bytes
  178. zmq_msg = ffi.new('zmq_msg_t*')
  179. if not isinstance(message, bytes):
  180. # cast any bufferable data to bytes via memoryview
  181. message = memoryview(message).tobytes()
  182. c_message = ffi.new('char[]', message)
  183. rc = C.zmq_msg_init_size(zmq_msg, len(message))
  184. _check_rc(rc)
  185. C.memcpy(C.zmq_msg_data(zmq_msg), c_message, len(message))
  186. _retry_sys_call(C.zmq_msg_send, zmq_msg, self._zmq_socket, flags)
  187. rc2 = C.zmq_msg_close(zmq_msg)
  188. _check_rc(rc2)
  189. if track:
  190. return zmq.MessageTracker()
  191. def recv(self, flags=0, copy=True, track=False):
  192. zmq_msg = ffi.new('zmq_msg_t*')
  193. C.zmq_msg_init(zmq_msg)
  194. try:
  195. _retry_sys_call(C.zmq_msg_recv, zmq_msg, self._zmq_socket, flags)
  196. except Exception:
  197. C.zmq_msg_close(zmq_msg)
  198. raise
  199. _buffer = ffi.buffer(C.zmq_msg_data(zmq_msg), C.zmq_msg_size(zmq_msg))
  200. value = _buffer[:]
  201. rc = C.zmq_msg_close(zmq_msg)
  202. _check_rc(rc)
  203. frame = Frame(value, track=track)
  204. frame.more = self.getsockopt(RCVMORE)
  205. if copy:
  206. return frame.bytes
  207. else:
  208. return frame
  209. def monitor(self, addr, events=-1):
  210. """s.monitor(addr, flags)
  211. Start publishing socket events on inproc.
  212. See libzmq docs for zmq_monitor for details.
  213. Note: requires libzmq >= 3.2
  214. Parameters
  215. ----------
  216. addr : str
  217. The inproc url used for monitoring. Passing None as
  218. the addr will cause an existing socket monitor to be
  219. deregistered.
  220. events : int [default: zmq.EVENT_ALL]
  221. The zmq event bitmask for which events will be sent to the monitor.
  222. """
  223. _check_version((3,2), "monitor")
  224. if events < 0:
  225. events = zmq.EVENT_ALL
  226. if addr is None:
  227. addr = ffi.NULL
  228. if isinstance(addr, unicode):
  229. addr = addr.encode('utf8')
  230. rc = C.zmq_socket_monitor(self._zmq_socket, addr, events)
  231. __all__ = ['Socket', 'IPC_PATH_MAX_LEN']