test_socket.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615
  1. # -*- coding: utf8 -*-
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import copy
  5. import errno
  6. import json
  7. import os
  8. import platform
  9. import time
  10. import warnings
  11. import socket
  12. import sys
  13. try:
  14. from unittest import mock
  15. except ImportError:
  16. mock = None
  17. import pytest
  18. from pytest import mark
  19. import zmq
  20. from zmq.tests import (
  21. BaseZMQTestCase, SkipTest, have_gevent, GreenTest, skip_pypy
  22. )
  23. from zmq.utils.strtypes import unicode
  24. pypy = platform.python_implementation().lower() == 'pypy'
  25. windows = platform.platform().lower().startswith('windows')
  26. on_travis = bool(os.environ.get('TRAVIS_PYTHON_VERSION'))
  27. # polling on windows is slow
  28. POLL_TIMEOUT = 1000 if windows else 100
  29. class TestSocket(BaseZMQTestCase):
  30. def test_create(self):
  31. ctx = self.Context()
  32. s = ctx.socket(zmq.PUB)
  33. # Superluminal protocol not yet implemented
  34. self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.bind, 'ftl://a')
  35. self.assertRaisesErrno(zmq.EPROTONOSUPPORT, s.connect, 'ftl://a')
  36. self.assertRaisesErrno(zmq.EINVAL, s.bind, 'tcp://')
  37. s.close()
  38. del ctx
  39. def test_context_manager(self):
  40. url = 'inproc://a'
  41. with self.Context() as ctx:
  42. with ctx.socket(zmq.PUSH) as a:
  43. a.bind(url)
  44. with ctx.socket(zmq.PULL) as b:
  45. b.connect(url)
  46. msg = b'hi'
  47. a.send(msg)
  48. rcvd = self.recv(b)
  49. self.assertEqual(rcvd, msg)
  50. self.assertEqual(b.closed, True)
  51. self.assertEqual(a.closed, True)
  52. self.assertEqual(ctx.closed, True)
  53. def test_dir(self):
  54. ctx = self.Context()
  55. s = ctx.socket(zmq.PUB)
  56. self.assertTrue('send' in dir(s))
  57. self.assertTrue('IDENTITY' in dir(s))
  58. self.assertTrue('AFFINITY' in dir(s))
  59. self.assertTrue('FD' in dir(s))
  60. s.close()
  61. ctx.term()
  62. @mark.skipif(mock is None, reason="requires unittest.mock")
  63. def test_mockable(self):
  64. s = self.socket(zmq.SUB)
  65. m = mock.Mock(spec=s)
  66. s.close()
  67. def test_bind_unicode(self):
  68. s = self.socket(zmq.PUB)
  69. p = s.bind_to_random_port(unicode("tcp://*"))
  70. def test_connect_unicode(self):
  71. s = self.socket(zmq.PUB)
  72. s.connect(unicode("tcp://127.0.0.1:5555"))
  73. def test_bind_to_random_port(self):
  74. # Check that bind_to_random_port do not hide useful exception
  75. ctx = self.Context()
  76. c = ctx.socket(zmq.PUB)
  77. # Invalid format
  78. try:
  79. c.bind_to_random_port('tcp:*')
  80. except zmq.ZMQError as e:
  81. self.assertEqual(e.errno, zmq.EINVAL)
  82. # Invalid protocol
  83. try:
  84. c.bind_to_random_port('rand://*')
  85. except zmq.ZMQError as e:
  86. self.assertEqual(e.errno, zmq.EPROTONOSUPPORT)
  87. def test_identity(self):
  88. s = self.context.socket(zmq.PULL)
  89. self.sockets.append(s)
  90. ident = b'identity\0\0'
  91. s.identity = ident
  92. self.assertEqual(s.get(zmq.IDENTITY), ident)
  93. def test_unicode_sockopts(self):
  94. """test setting/getting sockopts with unicode strings"""
  95. topic = "tést"
  96. if str is not unicode:
  97. topic = topic.decode('utf8')
  98. p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
  99. self.assertEqual(s.send_unicode, s.send_unicode)
  100. self.assertEqual(p.recv_unicode, p.recv_unicode)
  101. self.assertRaises(TypeError, s.setsockopt, zmq.SUBSCRIBE, topic)
  102. self.assertRaises(TypeError, s.setsockopt, zmq.IDENTITY, topic)
  103. s.setsockopt_unicode(zmq.IDENTITY, topic, 'utf16')
  104. self.assertRaises(TypeError, s.setsockopt, zmq.AFFINITY, topic)
  105. s.setsockopt_unicode(zmq.SUBSCRIBE, topic)
  106. self.assertRaises(TypeError, s.getsockopt_unicode, zmq.AFFINITY)
  107. self.assertRaisesErrno(zmq.EINVAL, s.getsockopt_unicode, zmq.SUBSCRIBE)
  108. identb = s.getsockopt(zmq.IDENTITY)
  109. identu = identb.decode('utf16')
  110. identu2 = s.getsockopt_unicode(zmq.IDENTITY, 'utf16')
  111. self.assertEqual(identu, identu2)
  112. time.sleep(0.1) # wait for connection/subscription
  113. p.send_unicode(topic,zmq.SNDMORE)
  114. p.send_unicode(topic*2, encoding='latin-1')
  115. self.assertEqual(topic, s.recv_unicode())
  116. self.assertEqual(topic*2, s.recv_unicode(encoding='latin-1'))
  117. def test_int_sockopts(self):
  118. "test integer sockopts"
  119. v = zmq.zmq_version_info()
  120. if v < (3,0):
  121. default_hwm = 0
  122. else:
  123. default_hwm = 1000
  124. p,s = self.create_bound_pair(zmq.PUB, zmq.SUB)
  125. p.setsockopt(zmq.LINGER, 0)
  126. self.assertEqual(p.getsockopt(zmq.LINGER), 0)
  127. p.setsockopt(zmq.LINGER, -1)
  128. self.assertEqual(p.getsockopt(zmq.LINGER), -1)
  129. self.assertEqual(p.hwm, default_hwm)
  130. p.hwm = 11
  131. self.assertEqual(p.hwm, 11)
  132. # p.setsockopt(zmq.EVENTS, zmq.POLLIN)
  133. self.assertEqual(p.getsockopt(zmq.EVENTS), zmq.POLLOUT)
  134. self.assertRaisesErrno(zmq.EINVAL, p.setsockopt,zmq.EVENTS, 2**7-1)
  135. self.assertEqual(p.getsockopt(zmq.TYPE), p.socket_type)
  136. self.assertEqual(p.getsockopt(zmq.TYPE), zmq.PUB)
  137. self.assertEqual(s.getsockopt(zmq.TYPE), s.socket_type)
  138. self.assertEqual(s.getsockopt(zmq.TYPE), zmq.SUB)
  139. # check for overflow / wrong type:
  140. errors = []
  141. backref = {}
  142. constants = zmq.constants
  143. for name in constants.__all__:
  144. value = getattr(constants, name)
  145. if isinstance(value, int):
  146. backref[value] = name
  147. for opt in zmq.constants.int_sockopts.union(zmq.constants.int64_sockopts):
  148. sopt = backref[opt]
  149. if sopt.startswith((
  150. 'ROUTER', 'XPUB', 'TCP', 'FAIL',
  151. 'REQ_', 'CURVE_', 'PROBE_ROUTER',
  152. 'IPC_FILTER', 'GSSAPI', 'STREAM_',
  153. 'VMCI_BUFFER_SIZE', 'VMCI_BUFFER_MIN_SIZE',
  154. 'VMCI_BUFFER_MAX_SIZE', 'VMCI_CONNECT_TIMEOUT',
  155. )):
  156. # some sockopts are write-only
  157. continue
  158. try:
  159. n = p.getsockopt(opt)
  160. except zmq.ZMQError as e:
  161. errors.append("getsockopt(zmq.%s) raised '%s'."%(sopt, e))
  162. else:
  163. if n > 2**31:
  164. errors.append("getsockopt(zmq.%s) returned a ridiculous value."
  165. " It is probably the wrong type."%sopt)
  166. if errors:
  167. self.fail('\n'.join([''] + errors))
  168. def test_bad_sockopts(self):
  169. """Test that appropriate errors are raised on bad socket options"""
  170. s = self.context.socket(zmq.PUB)
  171. self.sockets.append(s)
  172. s.setsockopt(zmq.LINGER, 0)
  173. # unrecognized int sockopts pass through to libzmq, and should raise EINVAL
  174. self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, 9999, 5)
  175. self.assertRaisesErrno(zmq.EINVAL, s.getsockopt, 9999)
  176. # but only int sockopts are allowed through this way, otherwise raise a TypeError
  177. self.assertRaises(TypeError, s.setsockopt, 9999, b"5")
  178. # some sockopts are valid in general, but not on every socket:
  179. self.assertRaisesErrno(zmq.EINVAL, s.setsockopt, zmq.SUBSCRIBE, b'hi')
  180. def test_sockopt_roundtrip(self):
  181. "test set/getsockopt roundtrip."
  182. p = self.context.socket(zmq.PUB)
  183. self.sockets.append(p)
  184. p.setsockopt(zmq.LINGER, 11)
  185. self.assertEqual(p.getsockopt(zmq.LINGER), 11)
  186. def test_send_unicode(self):
  187. "test sending unicode objects"
  188. a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  189. self.sockets.extend([a,b])
  190. u = "çπ§"
  191. if str is not unicode:
  192. u = u.decode('utf8')
  193. self.assertRaises(TypeError, a.send, u,copy=False)
  194. self.assertRaises(TypeError, a.send, u,copy=True)
  195. a.send_unicode(u)
  196. s = b.recv()
  197. self.assertEqual(s,u.encode('utf8'))
  198. self.assertEqual(s.decode('utf8'),u)
  199. a.send_unicode(u,encoding='utf16')
  200. s = b.recv_unicode(encoding='utf16')
  201. self.assertEqual(s,u)
  202. def test_send_multipart_check_type(self):
  203. "check type on all frames in send_multipart"
  204. a,b = self.create_bound_pair(zmq.PAIR, zmq.PAIR)
  205. self.sockets.extend([a,b])
  206. self.assertRaises(TypeError, a.send_multipart, [b'a', 5])
  207. a.send_multipart([b'b'])
  208. rcvd = self.recv_multipart(b)
  209. self.assertEqual(rcvd, [b'b'])
  210. @skip_pypy
  211. def test_tracker(self):
  212. "test the MessageTracker object for tracking when zmq is done with a buffer"
  213. addr = 'tcp://127.0.0.1'
  214. # get a port:
  215. sock = socket.socket()
  216. sock.bind(('127.0.0.1', 0))
  217. port = sock.getsockname()[1]
  218. iface = "%s:%i" % (addr, port)
  219. sock.close()
  220. time.sleep(0.1)
  221. a = self.context.socket(zmq.PUSH)
  222. b = self.context.socket(zmq.PULL)
  223. self.sockets.extend([a,b])
  224. a.connect(iface)
  225. time.sleep(0.1)
  226. p1 = a.send(b'something', copy=False, track=True)
  227. assert isinstance(p1, zmq.MessageTracker)
  228. assert p1 is zmq._FINISHED_TRACKER
  229. # small message, should start done
  230. assert p1.done
  231. # disable zero-copy threshold
  232. a.copy_threshold = 0
  233. p2 = a.send_multipart([b'something', b'else'], copy=False, track=True)
  234. assert isinstance(p2, zmq.MessageTracker)
  235. assert not p2.done
  236. b.bind(iface)
  237. msg = self.recv_multipart(b)
  238. for i in range(10):
  239. if p1.done:
  240. break
  241. time.sleep(0.1)
  242. self.assertEqual(p1.done, True)
  243. self.assertEqual(msg, [b'something'])
  244. msg = self.recv_multipart(b)
  245. for i in range(10):
  246. if p2.done:
  247. break
  248. time.sleep(0.1)
  249. self.assertEqual(p2.done, True)
  250. self.assertEqual(msg, [b'something', b'else'])
  251. m = zmq.Frame(b"again", copy=False, track=True)
  252. self.assertEqual(m.tracker.done, False)
  253. p1 = a.send(m, copy=False)
  254. p2 = a.send(m, copy=False)
  255. self.assertEqual(m.tracker.done, False)
  256. self.assertEqual(p1.done, False)
  257. self.assertEqual(p2.done, False)
  258. msg = self.recv_multipart(b)
  259. self.assertEqual(m.tracker.done, False)
  260. self.assertEqual(msg, [b'again'])
  261. msg = self.recv_multipart(b)
  262. self.assertEqual(m.tracker.done, False)
  263. self.assertEqual(msg, [b'again'])
  264. self.assertEqual(p1.done, False)
  265. self.assertEqual(p2.done, False)
  266. pm = m.tracker
  267. del m
  268. for i in range(10):
  269. if p1.done:
  270. break
  271. time.sleep(0.1)
  272. self.assertEqual(p1.done, True)
  273. self.assertEqual(p2.done, True)
  274. m = zmq.Frame(b'something', track=False)
  275. self.assertRaises(ValueError, a.send, m, copy=False, track=True)
  276. def test_close(self):
  277. ctx = self.Context()
  278. s = ctx.socket(zmq.PUB)
  279. s.close()
  280. self.assertRaisesErrno(zmq.ENOTSOCK, s.bind, b'')
  281. self.assertRaisesErrno(zmq.ENOTSOCK, s.connect, b'')
  282. self.assertRaisesErrno(zmq.ENOTSOCK, s.setsockopt, zmq.SUBSCRIBE, b'')
  283. self.assertRaisesErrno(zmq.ENOTSOCK, s.send, b'asdf')
  284. self.assertRaisesErrno(zmq.ENOTSOCK, s.recv)
  285. del ctx
  286. def test_attr(self):
  287. """set setting/getting sockopts as attributes"""
  288. s = self.context.socket(zmq.DEALER)
  289. self.sockets.append(s)
  290. linger = 10
  291. s.linger = linger
  292. self.assertEqual(linger, s.linger)
  293. self.assertEqual(linger, s.getsockopt(zmq.LINGER))
  294. self.assertEqual(s.fd, s.getsockopt(zmq.FD))
  295. def test_bad_attr(self):
  296. s = self.context.socket(zmq.DEALER)
  297. self.sockets.append(s)
  298. try:
  299. s.apple='foo'
  300. except AttributeError:
  301. pass
  302. else:
  303. self.fail("bad setattr should have raised AttributeError")
  304. try:
  305. s.apple
  306. except AttributeError:
  307. pass
  308. else:
  309. self.fail("bad getattr should have raised AttributeError")
  310. def test_subclass(self):
  311. """subclasses can assign attributes"""
  312. class S(zmq.Socket):
  313. a = None
  314. def __init__(self, *a, **kw):
  315. self.a=-1
  316. super(S, self).__init__(*a, **kw)
  317. s = S(self.context, zmq.REP)
  318. self.sockets.append(s)
  319. self.assertEqual(s.a, -1)
  320. s.a=1
  321. self.assertEqual(s.a, 1)
  322. a=s.a
  323. self.assertEqual(a, 1)
  324. def test_recv_multipart(self):
  325. a,b = self.create_bound_pair()
  326. msg = b'hi'
  327. for i in range(3):
  328. a.send(msg)
  329. time.sleep(0.1)
  330. for i in range(3):
  331. self.assertEqual(self.recv_multipart(b), [msg])
  332. def test_close_after_destroy(self):
  333. """s.close() after ctx.destroy() should be fine"""
  334. ctx = self.Context()
  335. s = ctx.socket(zmq.REP)
  336. ctx.destroy()
  337. # reaper is not instantaneous
  338. time.sleep(1e-2)
  339. s.close()
  340. self.assertTrue(s.closed)
  341. def test_poll(self):
  342. a,b = self.create_bound_pair()
  343. tic = time.time()
  344. evt = a.poll(POLL_TIMEOUT)
  345. self.assertEqual(evt, 0)
  346. evt = a.poll(POLL_TIMEOUT, zmq.POLLOUT)
  347. self.assertEqual(evt, zmq.POLLOUT)
  348. msg = b'hi'
  349. a.send(msg)
  350. evt = b.poll(POLL_TIMEOUT)
  351. self.assertEqual(evt, zmq.POLLIN)
  352. msg2 = self.recv(b)
  353. evt = b.poll(POLL_TIMEOUT)
  354. self.assertEqual(evt, 0)
  355. self.assertEqual(msg2, msg)
  356. def test_ipc_path_max_length(self):
  357. """IPC_PATH_MAX_LEN is a sensible value"""
  358. if zmq.IPC_PATH_MAX_LEN == 0:
  359. raise SkipTest("IPC_PATH_MAX_LEN undefined")
  360. msg = "Surprising value for IPC_PATH_MAX_LEN: %s" % zmq.IPC_PATH_MAX_LEN
  361. self.assertTrue(zmq.IPC_PATH_MAX_LEN > 30, msg)
  362. self.assertTrue(zmq.IPC_PATH_MAX_LEN < 1025, msg)
  363. def test_ipc_path_max_length_msg(self):
  364. if zmq.IPC_PATH_MAX_LEN == 0:
  365. raise SkipTest("IPC_PATH_MAX_LEN undefined")
  366. s = self.context.socket(zmq.PUB)
  367. self.sockets.append(s)
  368. try:
  369. s.bind('ipc://{0}'.format('a' * (zmq.IPC_PATH_MAX_LEN + 1)))
  370. except zmq.ZMQError as e:
  371. self.assertTrue(str(zmq.IPC_PATH_MAX_LEN) in e.strerror)
  372. @mark.skipif(windows, reason="ipc not supported on Windows.")
  373. def test_ipc_path_no_such_file_or_directory_message(self):
  374. """Display the ipc path in case of an ENOENT exception"""
  375. s = self.context.socket(zmq.PUB)
  376. self.sockets.append(s)
  377. invalid_path = '/foo/bar'
  378. with pytest.raises(zmq.ZMQError) as error:
  379. s.bind('ipc://{0}'.format(invalid_path))
  380. assert error.value.errno == errno.ENOENT
  381. error_message = str(error.value)
  382. assert invalid_path in error_message
  383. assert "no such file or directory" in error_message.lower()
  384. def test_hwm(self):
  385. zmq3 = zmq.zmq_version_info()[0] >= 3
  386. for stype in (zmq.PUB, zmq.ROUTER, zmq.SUB, zmq.REQ, zmq.DEALER):
  387. s = self.context.socket(stype)
  388. s.hwm = 100
  389. self.assertEqual(s.hwm, 100)
  390. if zmq3:
  391. try:
  392. self.assertEqual(s.sndhwm, 100)
  393. except AttributeError:
  394. pass
  395. try:
  396. self.assertEqual(s.rcvhwm, 100)
  397. except AttributeError:
  398. pass
  399. s.close()
  400. def test_copy(self):
  401. s = self.socket(zmq.PUB)
  402. scopy = copy.copy(s)
  403. sdcopy = copy.deepcopy(s)
  404. self.assert_(scopy._shadow)
  405. self.assert_(sdcopy._shadow)
  406. self.assertEqual(s.underlying, scopy.underlying)
  407. self.assertEqual(s.underlying, sdcopy.underlying)
  408. s.close()
  409. def test_send_buffer(self):
  410. a, b = self.create_bound_pair(zmq.PUSH, zmq.PULL)
  411. for buffer_type in (memoryview, bytearray):
  412. rawbytes = str(buffer_type).encode('ascii')
  413. msg = buffer_type(rawbytes)
  414. a.send(msg)
  415. recvd = b.recv()
  416. assert recvd == rawbytes
  417. def test_shadow(self):
  418. p = self.socket(zmq.PUSH)
  419. p.bind("tcp://127.0.0.1:5555")
  420. p2 = zmq.Socket.shadow(p.underlying)
  421. self.assertEqual(p.underlying, p2.underlying)
  422. s = self.socket(zmq.PULL)
  423. s2 = zmq.Socket.shadow(s.underlying)
  424. self.assertNotEqual(s.underlying, p.underlying)
  425. self.assertEqual(s.underlying, s2.underlying)
  426. s2.connect("tcp://127.0.0.1:5555")
  427. sent = b'hi'
  428. p2.send(sent)
  429. rcvd = self.recv(s2)
  430. self.assertEqual(rcvd, sent)
  431. def test_shadow_pyczmq(self):
  432. try:
  433. from pyczmq import zctx, zsocket
  434. except Exception:
  435. raise SkipTest("Requires pyczmq")
  436. ctx = zctx.new()
  437. ca = zsocket.new(ctx, zmq.PUSH)
  438. cb = zsocket.new(ctx, zmq.PULL)
  439. a = zmq.Socket.shadow(ca)
  440. b = zmq.Socket.shadow(cb)
  441. a.bind("inproc://a")
  442. b.connect("inproc://a")
  443. a.send(b'hi')
  444. rcvd = self.recv(b)
  445. self.assertEqual(rcvd, b'hi')
  446. def test_subscribe_method(self):
  447. pub, sub = self.create_bound_pair(zmq.PUB, zmq.SUB)
  448. sub.subscribe('prefix')
  449. sub.subscribe = 'c'
  450. p = zmq.Poller()
  451. p.register(sub, zmq.POLLIN)
  452. # wait for subscription handshake
  453. for i in range(100):
  454. pub.send(b'canary')
  455. events = p.poll(250)
  456. if events:
  457. break
  458. self.recv(sub)
  459. pub.send(b'prefixmessage')
  460. msg = self.recv(sub)
  461. self.assertEqual(msg, b'prefixmessage')
  462. sub.unsubscribe('prefix')
  463. pub.send(b'prefixmessage')
  464. events = p.poll(1000)
  465. self.assertEqual(events, [])
  466. # Travis can't handle how much memory PyPy uses on this test
  467. @mark.skipif(
  468. (
  469. pypy and on_travis
  470. ) or (
  471. sys.maxsize < 2**32
  472. ) or (
  473. windows
  474. ),
  475. reason="only run on 64b and not on Travis."
  476. )
  477. @mark.large
  478. def test_large_send(self):
  479. c = os.urandom(1)
  480. N = 2**31 + 1
  481. try:
  482. buf = c * N
  483. except MemoryError as e:
  484. raise SkipTest("Not enough memory: %s" % e)
  485. a, b = self.create_bound_pair()
  486. try:
  487. a.send(buf, copy=False)
  488. rcvd = b.recv(copy=False)
  489. except MemoryError as e:
  490. raise SkipTest("Not enough memory: %s" % e)
  491. # sample the front and back of the received message
  492. # without checking the whole content
  493. # Python 2: items in memoryview are bytes
  494. # Python 3: items im memoryview are int
  495. byte = c if sys.version_info < (3,) else ord(c)
  496. view = memoryview(rcvd)
  497. assert len(view) == N
  498. assert view[0] == byte
  499. assert view[-1] == byte
  500. def test_custom_serialize(self):
  501. a, b = self.create_bound_pair(zmq.DEALER, zmq.ROUTER)
  502. def serialize(msg):
  503. frames = []
  504. frames.extend(msg.get('identities', []))
  505. content = json.dumps(msg['content']).encode('utf8')
  506. frames.append(content)
  507. return frames
  508. def deserialize(frames):
  509. identities = frames[:-1]
  510. content = json.loads(frames[-1].decode('utf8'))
  511. return {
  512. 'identities': identities,
  513. 'content': content,
  514. }
  515. msg = {
  516. 'content': {
  517. 'a': 5,
  518. 'b': 'bee',
  519. }
  520. }
  521. a.send_serialized(msg, serialize)
  522. recvd = b.recv_serialized(deserialize)
  523. assert recvd['content'] == msg['content']
  524. assert recvd['identities']
  525. # bounce back, tests identities
  526. b.send_serialized(recvd, serialize)
  527. r2 = a.recv_serialized(deserialize)
  528. assert r2['content'] == msg['content']
  529. assert not r2['identities']
  530. if have_gevent and not windows:
  531. import gevent
  532. class TestSocketGreen(GreenTest, TestSocket):
  533. test_bad_attr = GreenTest.skip_green
  534. test_close_after_destroy = GreenTest.skip_green
  535. def test_timeout(self):
  536. a,b = self.create_bound_pair()
  537. g = gevent.spawn_later(0.5, lambda: a.send(b'hi'))
  538. timeout = gevent.Timeout(0.1)
  539. timeout.start()
  540. self.assertRaises(gevent.Timeout, b.recv)
  541. g.kill()
  542. @mark.skipif(not hasattr(zmq, 'RCVTIMEO'), reason="requires RCVTIMEO")
  543. def test_warn_set_timeo(self):
  544. s = self.context.socket(zmq.REQ)
  545. with warnings.catch_warnings(record=True) as w:
  546. s.rcvtimeo = 5
  547. s.close()
  548. self.assertEqual(len(w), 1)
  549. self.assertEqual(w[0].category, UserWarning)
  550. @mark.skipif(not hasattr(zmq, 'SNDTIMEO'), reason="requires SNDTIMEO")
  551. def test_warn_get_timeo(self):
  552. s = self.context.socket(zmq.REQ)
  553. with warnings.catch_warnings(record=True) as w:
  554. s.sndtimeo
  555. s.close()
  556. self.assertEqual(len(w), 1)
  557. self.assertEqual(w[0].category, UserWarning)