_poll.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980
  1. # coding: utf-8
  2. """zmq poll function"""
  3. # Copyright (C) PyZMQ Developers
  4. # Distributed under the terms of the Modified BSD License.
  5. try:
  6. from time import monotonic
  7. except ImportError:
  8. from time import clock as monotonic
  9. import warnings
  10. from ._cffi import C, ffi
  11. from zmq.error import InterruptedSystemCall, _check_rc
  12. def _make_zmq_pollitem(socket, flags):
  13. zmq_socket = socket._zmq_socket
  14. zmq_pollitem = ffi.new('zmq_pollitem_t*')
  15. zmq_pollitem.socket = zmq_socket
  16. zmq_pollitem.fd = 0
  17. zmq_pollitem.events = flags
  18. zmq_pollitem.revents = 0
  19. return zmq_pollitem[0]
  20. def _make_zmq_pollitem_fromfd(socket_fd, flags):
  21. zmq_pollitem = ffi.new('zmq_pollitem_t*')
  22. zmq_pollitem.socket = ffi.NULL
  23. zmq_pollitem.fd = socket_fd
  24. zmq_pollitem.events = flags
  25. zmq_pollitem.revents = 0
  26. return zmq_pollitem[0]
  27. def zmq_poll(sockets, timeout):
  28. cffi_pollitem_list = []
  29. low_level_to_socket_obj = {}
  30. from zmq import Socket
  31. for item in sockets:
  32. if isinstance(item[0], Socket):
  33. low_level_to_socket_obj[item[0]._zmq_socket] = item
  34. cffi_pollitem_list.append(_make_zmq_pollitem(item[0], item[1]))
  35. else:
  36. if not isinstance(item[0], int):
  37. # not an FD, get it from fileno()
  38. item = (item[0].fileno(), item[1])
  39. low_level_to_socket_obj[item[0]] = item
  40. cffi_pollitem_list.append(_make_zmq_pollitem_fromfd(item[0], item[1]))
  41. items = ffi.new('zmq_pollitem_t[]', cffi_pollitem_list)
  42. list_length = ffi.cast('int', len(cffi_pollitem_list))
  43. while True:
  44. c_timeout = ffi.cast('long', timeout)
  45. start = monotonic()
  46. rc = C.zmq_poll(items, list_length, c_timeout)
  47. try:
  48. _check_rc(rc)
  49. except InterruptedSystemCall:
  50. if timeout > 0:
  51. ms_passed = int(1000 * (monotonic() - start))
  52. if ms_passed < 0:
  53. # don't allow negative ms_passed,
  54. # which can happen on old Python versions without time.monotonic.
  55. warnings.warn(
  56. "Negative elapsed time for interrupted poll: %s."
  57. " Did the clock change?" % ms_passed,
  58. RuntimeWarning)
  59. ms_passed = 0
  60. timeout = max(0, timeout - ms_passed)
  61. continue
  62. else:
  63. break
  64. result = []
  65. for index in range(len(items)):
  66. if items[index].revents > 0:
  67. if not items[index].socket == ffi.NULL:
  68. result.append((low_level_to_socket_obj[items[index].socket][0],
  69. items[index].revents))
  70. else:
  71. result.append((items[index].fd, items[index].revents))
  72. return result
  73. __all__ = ['zmq_poll']