poll.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. import zmq
  2. import gevent
  3. from gevent import select
  4. from zmq import Poller as _original_Poller
  5. class _Poller(_original_Poller):
  6. """Replacement for :class:`zmq.Poller`
  7. Ensures that the greened Poller below is used in calls to
  8. :meth:`zmq.Poller.poll`.
  9. """
  10. _gevent_bug_timeout = 1.33 # minimum poll interval, for working around gevent bug
  11. def _get_descriptors(self):
  12. """Returns three elements tuple with socket descriptors ready
  13. for gevent.select.select
  14. """
  15. rlist = []
  16. wlist = []
  17. xlist = []
  18. for socket, flags in self.sockets:
  19. if isinstance(socket, zmq.Socket):
  20. rlist.append(socket.getsockopt(zmq.FD))
  21. continue
  22. elif isinstance(socket, int):
  23. fd = socket
  24. elif hasattr(socket, 'fileno'):
  25. try:
  26. fd = int(socket.fileno())
  27. except:
  28. raise ValueError('fileno() must return an valid integer fd')
  29. else:
  30. raise TypeError('Socket must be a 0MQ socket, an integer fd '
  31. 'or have a fileno() method: %r' % socket)
  32. if flags & zmq.POLLIN:
  33. rlist.append(fd)
  34. if flags & zmq.POLLOUT:
  35. wlist.append(fd)
  36. if flags & zmq.POLLERR:
  37. xlist.append(fd)
  38. return (rlist, wlist, xlist)
  39. def poll(self, timeout=-1):
  40. """Overridden method to ensure that the green version of
  41. Poller is used.
  42. Behaves the same as :meth:`zmq.core.Poller.poll`
  43. """
  44. if timeout is None:
  45. timeout = -1
  46. if timeout < 0:
  47. timeout = -1
  48. rlist = None
  49. wlist = None
  50. xlist = None
  51. if timeout > 0:
  52. tout = gevent.Timeout.start_new(timeout/1000.0)
  53. else:
  54. tout = None
  55. try:
  56. # Loop until timeout or events available
  57. rlist, wlist, xlist = self._get_descriptors()
  58. while True:
  59. events = super(_Poller, self).poll(0)
  60. if events or timeout == 0:
  61. return events
  62. # wait for activity on sockets in a green way
  63. # set a minimum poll frequency,
  64. # because gevent < 1.0 cannot be trusted to catch edge-triggered FD events
  65. _bug_timeout = gevent.Timeout.start_new(self._gevent_bug_timeout)
  66. try:
  67. select.select(rlist, wlist, xlist)
  68. except gevent.Timeout as t:
  69. if t is not _bug_timeout:
  70. raise
  71. finally:
  72. _bug_timeout.cancel()
  73. except gevent.Timeout as t:
  74. if t is not tout:
  75. raise
  76. return []
  77. finally:
  78. if timeout > 0:
  79. tout.cancel()