test_cffi_backend.py 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # -*- coding: utf8 -*-
  2. import sys
  3. import time
  4. from unittest import TestCase
  5. from zmq.tests import BaseZMQTestCase, SkipTest
  6. try:
  7. from zmq.backend.cffi import (
  8. zmq_version_info,
  9. PUSH, PULL, IDENTITY,
  10. REQ, REP, POLLIN, POLLOUT,
  11. )
  12. from zmq.backend.cffi._cffi import ffi, C
  13. have_ffi_backend = True
  14. except ImportError:
  15. have_ffi_backend = False
  16. class TestCFFIBackend(TestCase):
  17. def setUp(self):
  18. if not have_ffi_backend:
  19. raise SkipTest('CFFI not available')
  20. def test_zmq_version_info(self):
  21. version = zmq_version_info()
  22. assert version[0] in range(2,11)
  23. def test_zmq_ctx_new_destroy(self):
  24. ctx = C.zmq_ctx_new()
  25. assert ctx != ffi.NULL
  26. assert 0 == C.zmq_ctx_destroy(ctx)
  27. def test_zmq_socket_open_close(self):
  28. ctx = C.zmq_ctx_new()
  29. socket = C.zmq_socket(ctx, PUSH)
  30. assert ctx != ffi.NULL
  31. assert ffi.NULL != socket
  32. assert 0 == C.zmq_close(socket)
  33. assert 0 == C.zmq_ctx_destroy(ctx)
  34. def test_zmq_setsockopt(self):
  35. ctx = C.zmq_ctx_new()
  36. socket = C.zmq_socket(ctx, PUSH)
  37. identity = ffi.new('char[3]', b'zmq')
  38. ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
  39. assert ret == 0
  40. assert ctx != ffi.NULL
  41. assert ffi.NULL != socket
  42. assert 0 == C.zmq_close(socket)
  43. assert 0 == C.zmq_ctx_destroy(ctx)
  44. def test_zmq_getsockopt(self):
  45. ctx = C.zmq_ctx_new()
  46. socket = C.zmq_socket(ctx, PUSH)
  47. identity = ffi.new('char[]', b'zmq')
  48. ret = C.zmq_setsockopt(socket, IDENTITY, ffi.cast('void*', identity), 3)
  49. assert ret == 0
  50. option_len = ffi.new('size_t*', 3)
  51. option = ffi.new('char[3]')
  52. ret = C.zmq_getsockopt(socket,
  53. IDENTITY,
  54. ffi.cast('void*', option),
  55. option_len)
  56. assert ret == 0
  57. assert ffi.string(ffi.cast('char*', option))[0:1] == b"z"
  58. assert ffi.string(ffi.cast('char*', option))[1:2] == b"m"
  59. assert ffi.string(ffi.cast('char*', option))[2:3] == b"q"
  60. assert ctx != ffi.NULL
  61. assert ffi.NULL != socket
  62. assert 0 == C.zmq_close(socket)
  63. assert 0 == C.zmq_ctx_destroy(ctx)
  64. def test_zmq_bind(self):
  65. ctx = C.zmq_ctx_new()
  66. socket = C.zmq_socket(ctx, 8)
  67. assert 0 == C.zmq_bind(socket, b'tcp://*:4444')
  68. assert ctx != ffi.NULL
  69. assert ffi.NULL != socket
  70. assert 0 == C.zmq_close(socket)
  71. assert 0 == C.zmq_ctx_destroy(ctx)
  72. def test_zmq_bind_connect(self):
  73. ctx = C.zmq_ctx_new()
  74. socket1 = C.zmq_socket(ctx, PUSH)
  75. socket2 = C.zmq_socket(ctx, PULL)
  76. assert 0 == C.zmq_bind(socket1, b'tcp://*:4444')
  77. assert 0 == C.zmq_connect(socket2, b'tcp://127.0.0.1:4444')
  78. assert ctx != ffi.NULL
  79. assert ffi.NULL != socket1
  80. assert ffi.NULL != socket2
  81. assert 0 == C.zmq_close(socket1)
  82. assert 0 == C.zmq_close(socket2)
  83. assert 0 == C.zmq_ctx_destroy(ctx)
  84. def test_zmq_msg_init_close(self):
  85. zmq_msg = ffi.new('zmq_msg_t*')
  86. assert ffi.NULL != zmq_msg
  87. assert 0 == C.zmq_msg_init(zmq_msg)
  88. assert 0 == C.zmq_msg_close(zmq_msg)
  89. def test_zmq_msg_init_size(self):
  90. zmq_msg = ffi.new('zmq_msg_t*')
  91. assert ffi.NULL != zmq_msg
  92. assert 0 == C.zmq_msg_init_size(zmq_msg, 10)
  93. assert 0 == C.zmq_msg_close(zmq_msg)
  94. def test_zmq_msg_init_data(self):
  95. zmq_msg = ffi.new('zmq_msg_t*')
  96. message = ffi.new('char[5]', b'Hello')
  97. assert 0 == C.zmq_msg_init_data(zmq_msg,
  98. ffi.cast('void*', message),
  99. 5,
  100. ffi.NULL,
  101. ffi.NULL)
  102. assert ffi.NULL != zmq_msg
  103. assert 0 == C.zmq_msg_close(zmq_msg)
  104. def test_zmq_msg_data(self):
  105. zmq_msg = ffi.new('zmq_msg_t*')
  106. message = ffi.new('char[]', b'Hello')
  107. assert 0 == C.zmq_msg_init_data(zmq_msg,
  108. ffi.cast('void*', message),
  109. 5,
  110. ffi.NULL,
  111. ffi.NULL)
  112. data = C.zmq_msg_data(zmq_msg)
  113. assert ffi.NULL != zmq_msg
  114. assert ffi.string(ffi.cast("char*", data)) == b'Hello'
  115. assert 0 == C.zmq_msg_close(zmq_msg)
  116. def test_zmq_send(self):
  117. ctx = C.zmq_ctx_new()
  118. sender = C.zmq_socket(ctx, REQ)
  119. receiver = C.zmq_socket(ctx, REP)
  120. assert 0 == C.zmq_bind(receiver, b'tcp://*:7777')
  121. assert 0 == C.zmq_connect(sender, b'tcp://127.0.0.1:7777')
  122. time.sleep(0.1)
  123. zmq_msg = ffi.new('zmq_msg_t*')
  124. message = ffi.new('char[5]', b'Hello')
  125. C.zmq_msg_init_data(zmq_msg,
  126. ffi.cast('void*', message),
  127. ffi.cast('size_t', 5),
  128. ffi.NULL,
  129. ffi.NULL)
  130. assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
  131. assert 0 == C.zmq_msg_close(zmq_msg)
  132. assert C.zmq_close(sender) == 0
  133. assert C.zmq_close(receiver) == 0
  134. assert C.zmq_ctx_destroy(ctx) == 0
  135. def test_zmq_recv(self):
  136. ctx = C.zmq_ctx_new()
  137. sender = C.zmq_socket(ctx, REQ)
  138. receiver = C.zmq_socket(ctx, REP)
  139. assert 0 == C.zmq_bind(receiver, b'tcp://*:2222')
  140. assert 0 == C.zmq_connect(sender, b'tcp://127.0.0.1:2222')
  141. time.sleep(0.1)
  142. zmq_msg = ffi.new('zmq_msg_t*')
  143. message = ffi.new('char[5]', b'Hello')
  144. C.zmq_msg_init_data(zmq_msg,
  145. ffi.cast('void*', message),
  146. ffi.cast('size_t', 5),
  147. ffi.NULL,
  148. ffi.NULL)
  149. zmq_msg2 = ffi.new('zmq_msg_t*')
  150. C.zmq_msg_init(zmq_msg2)
  151. assert 5 == C.zmq_msg_send(zmq_msg, sender, 0)
  152. assert 5 == C.zmq_msg_recv(zmq_msg2, receiver, 0)
  153. assert 5 == C.zmq_msg_size(zmq_msg2)
  154. assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
  155. C.zmq_msg_size(zmq_msg2))[:]
  156. assert C.zmq_close(sender) == 0
  157. assert C.zmq_close(receiver) == 0
  158. assert C.zmq_ctx_destroy(ctx) == 0
  159. def test_zmq_poll(self):
  160. ctx = C.zmq_ctx_new()
  161. sender = C.zmq_socket(ctx, REQ)
  162. receiver = C.zmq_socket(ctx, REP)
  163. r1 = C.zmq_bind(receiver, b'tcp://*:3333')
  164. r2 = C.zmq_connect(sender, b'tcp://127.0.0.1:3333')
  165. zmq_msg = ffi.new('zmq_msg_t*')
  166. message = ffi.new('char[5]', b'Hello')
  167. C.zmq_msg_init_data(zmq_msg,
  168. ffi.cast('void*', message),
  169. ffi.cast('size_t', 5),
  170. ffi.NULL,
  171. ffi.NULL)
  172. receiver_pollitem = ffi.new('zmq_pollitem_t*')
  173. receiver_pollitem.socket = receiver
  174. receiver_pollitem.fd = 0
  175. receiver_pollitem.events = POLLIN | POLLOUT
  176. receiver_pollitem.revents = 0
  177. ret = C.zmq_poll(ffi.NULL, 0, 0)
  178. assert ret == 0
  179. ret = C.zmq_poll(receiver_pollitem, 1, 0)
  180. assert ret == 0
  181. ret = C.zmq_msg_send(zmq_msg, sender, 0)
  182. print(ffi.string(C.zmq_strerror(C.zmq_errno())))
  183. assert ret == 5
  184. time.sleep(0.2)
  185. ret = C.zmq_poll(receiver_pollitem, 1, 0)
  186. assert ret == 1
  187. assert int(receiver_pollitem.revents) & POLLIN
  188. assert not int(receiver_pollitem.revents) & POLLOUT
  189. zmq_msg2 = ffi.new('zmq_msg_t*')
  190. C.zmq_msg_init(zmq_msg2)
  191. ret_recv = C.zmq_msg_recv(zmq_msg2, receiver, 0)
  192. assert ret_recv == 5
  193. assert 5 == C.zmq_msg_size(zmq_msg2)
  194. assert b"Hello" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
  195. C.zmq_msg_size(zmq_msg2))[:]
  196. sender_pollitem = ffi.new('zmq_pollitem_t*')
  197. sender_pollitem.socket = sender
  198. sender_pollitem.fd = 0
  199. sender_pollitem.events = POLLIN | POLLOUT
  200. sender_pollitem.revents = 0
  201. ret = C.zmq_poll(sender_pollitem, 1, 0)
  202. assert ret == 0
  203. zmq_msg_again = ffi.new('zmq_msg_t*')
  204. message_again = ffi.new('char[11]', b'Hello Again')
  205. C.zmq_msg_init_data(zmq_msg_again,
  206. ffi.cast('void*', message_again),
  207. ffi.cast('size_t', 11),
  208. ffi.NULL,
  209. ffi.NULL)
  210. assert 11 == C.zmq_msg_send(zmq_msg_again, receiver, 0)
  211. time.sleep(0.2)
  212. assert 0 <= C.zmq_poll(sender_pollitem, 1, 0)
  213. assert int(sender_pollitem.revents) & POLLIN
  214. assert 11 == C.zmq_msg_recv(zmq_msg2, sender, 0)
  215. assert 11 == C.zmq_msg_size(zmq_msg2)
  216. assert b"Hello Again" == ffi.buffer(C.zmq_msg_data(zmq_msg2),
  217. int(C.zmq_msg_size(zmq_msg2)))[:]
  218. assert 0 == C.zmq_close(sender)
  219. assert 0 == C.zmq_close(receiver)
  220. assert 0 == C.zmq_ctx_destroy(ctx)
  221. assert 0 == C.zmq_msg_close(zmq_msg)
  222. assert 0 == C.zmq_msg_close(zmq_msg2)
  223. assert 0 == C.zmq_msg_close(zmq_msg_again)