monitoredqueue.pxd 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. """MonitoredQueue class declarations.
  2. Authors
  3. -------
  4. * MinRK
  5. * Brian Granger
  6. """
  7. #
  8. # Copyright (c) 2010 Min Ragan-Kelley, Brian Granger
  9. #
  10. # This file is part of pyzmq, but is derived and adapted from zmq_queue.cpp
  11. # originally from libzmq-2.1.6, used under LGPLv3
  12. #
  13. # pyzmq is free software; you can redistribute it and/or modify it under
  14. # the terms of the Lesser GNU General Public License as published by
  15. # the Free Software Foundation; either version 3 of the License, or
  16. # (at your option) any later version.
  17. #
  18. # pyzmq is distributed in the hope that it will be useful,
  19. # but WITHOUT ANY WARRANTY; without even the implied warranty of
  20. # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  21. # Lesser GNU General Public License for more details.
  22. #
  23. # You should have received a copy of the Lesser GNU General Public License
  24. # along with this program. If not, see <http://www.gnu.org/licenses/>.
  25. #
  26. #-----------------------------------------------------------------------------
  27. # Imports
  28. #-----------------------------------------------------------------------------
  29. from zmq.backend.cython.libzmq cimport *
  30. #-----------------------------------------------------------------------------
  31. # MonitoredQueue C functions
  32. #-----------------------------------------------------------------------------
  33. cdef inline int _relay(void *insocket_, void *outsocket_, void *sidesocket_,
  34. zmq_msg_t msg, zmq_msg_t side_msg, zmq_msg_t id_msg,
  35. bint swap_ids) nogil:
  36. cdef int rc
  37. cdef int64_t flag_2
  38. cdef int flag_3
  39. cdef int flags
  40. cdef bint more
  41. cdef size_t flagsz
  42. cdef void * flag_ptr
  43. if ZMQ_VERSION_MAJOR < 3:
  44. flagsz = sizeof (int64_t)
  45. flag_ptr = &flag_2
  46. else:
  47. flagsz = sizeof (int)
  48. flag_ptr = &flag_3
  49. if swap_ids:# both router, must send second identity first
  50. # recv two ids into msg, id_msg
  51. rc = zmq_msg_recv(&msg, insocket_, 0)
  52. if rc < 0: return rc
  53. rc = zmq_msg_recv(&id_msg, insocket_, 0)
  54. if rc < 0: return rc
  55. # send second id (id_msg) first
  56. #!!!! always send a copy before the original !!!!
  57. rc = zmq_msg_copy(&side_msg, &id_msg)
  58. if rc < 0: return rc
  59. rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
  60. if rc < 0: return rc
  61. rc = zmq_msg_send(&id_msg, sidesocket_, ZMQ_SNDMORE)
  62. if rc < 0: return rc
  63. # send first id (msg) second
  64. rc = zmq_msg_copy(&side_msg, &msg)
  65. if rc < 0: return rc
  66. rc = zmq_msg_send(&side_msg, outsocket_, ZMQ_SNDMORE)
  67. if rc < 0: return rc
  68. rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
  69. if rc < 0: return rc
  70. while (True):
  71. rc = zmq_msg_recv(&msg, insocket_, 0)
  72. if rc < 0: return rc
  73. # assert (rc == 0)
  74. rc = zmq_getsockopt (insocket_, ZMQ_RCVMORE, flag_ptr, &flagsz)
  75. if rc < 0: return rc
  76. flags = 0
  77. if ZMQ_VERSION_MAJOR < 3:
  78. if flag_2:
  79. flags |= ZMQ_SNDMORE
  80. else:
  81. if flag_3:
  82. flags |= ZMQ_SNDMORE
  83. # LABEL has been removed:
  84. # rc = zmq_getsockopt (insocket_, ZMQ_RCVLABEL, flag_ptr, &flagsz)
  85. # if flag_3:
  86. # flags |= ZMQ_SNDLABEL
  87. # assert (rc == 0)
  88. rc = zmq_msg_copy(&side_msg, &msg)
  89. if rc < 0: return rc
  90. if flags:
  91. rc = zmq_msg_send(&side_msg, outsocket_, flags)
  92. if rc < 0: return rc
  93. # only SNDMORE for side-socket
  94. rc = zmq_msg_send(&msg, sidesocket_, ZMQ_SNDMORE)
  95. if rc < 0: return rc
  96. else:
  97. rc = zmq_msg_send(&side_msg, outsocket_, 0)
  98. if rc < 0: return rc
  99. rc = zmq_msg_send(&msg, sidesocket_, 0)
  100. if rc < 0: return rc
  101. break
  102. return rc
  103. # the MonitoredQueue C function, adapted from zmq::queue.cpp :
  104. cdef inline int c_monitored_queue (void *insocket_, void *outsocket_,
  105. void *sidesocket_, zmq_msg_t *in_msg_ptr,
  106. zmq_msg_t *out_msg_ptr, int swap_ids) nogil:
  107. """The actual C function for a monitored queue device.
  108. See ``monitored_queue()`` for details.
  109. """
  110. cdef zmq_msg_t msg
  111. cdef int rc = zmq_msg_init (&msg)
  112. cdef zmq_msg_t id_msg
  113. rc = zmq_msg_init (&id_msg)
  114. if rc < 0: return rc
  115. cdef zmq_msg_t side_msg
  116. rc = zmq_msg_init (&side_msg)
  117. if rc < 0: return rc
  118. cdef zmq_pollitem_t items [2]
  119. items [0].socket = insocket_
  120. items [0].fd = 0
  121. items [0].events = ZMQ_POLLIN
  122. items [0].revents = 0
  123. items [1].socket = outsocket_
  124. items [1].fd = 0
  125. items [1].events = ZMQ_POLLIN
  126. items [1].revents = 0
  127. # I don't think sidesocket should be polled?
  128. # items [2].socket = sidesocket_
  129. # items [2].fd = 0
  130. # items [2].events = ZMQ_POLLIN
  131. # items [2].revents = 0
  132. while (True):
  133. # // Wait while there are either requests or replies to process.
  134. rc = zmq_poll (&items [0], 2, -1)
  135. if rc < 0: return rc
  136. # // The algorithm below asumes ratio of request and replies processed
  137. # // under full load to be 1:1. Although processing requests replies
  138. # // first is tempting it is suspectible to DoS attacks (overloading
  139. # // the system with unsolicited replies).
  140. #
  141. # // Process a request.
  142. if (items [0].revents & ZMQ_POLLIN):
  143. # send in_prefix to side socket
  144. rc = zmq_msg_copy(&side_msg, in_msg_ptr)
  145. if rc < 0: return rc
  146. rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
  147. if rc < 0: return rc
  148. # relay the rest of the message
  149. rc = _relay(insocket_, outsocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
  150. if rc < 0: return rc
  151. if (items [1].revents & ZMQ_POLLIN):
  152. # send out_prefix to side socket
  153. rc = zmq_msg_copy(&side_msg, out_msg_ptr)
  154. if rc < 0: return rc
  155. rc = zmq_msg_send(&side_msg, sidesocket_, ZMQ_SNDMORE)
  156. if rc < 0: return rc
  157. # relay the rest of the message
  158. rc = _relay(outsocket_, insocket_, sidesocket_, msg, side_msg, id_msg, swap_ids)
  159. if rc < 0: return rc
  160. return rc