tracker.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. """Tracker for zero-copy messages with 0MQ."""
  2. # Copyright (C) PyZMQ Developers
  3. # Distributed under the terms of the Modified BSD License.
  4. import time
  5. try:
  6. # below 3.3
  7. from threading import _Event as Event
  8. except (ImportError, AttributeError):
  9. # python throws ImportError, cython throws AttributeError
  10. from threading import Event
  11. from zmq.error import NotDone
  12. from zmq.backend import Frame
  13. class MessageTracker(object):
  14. """MessageTracker(*towatch)
  15. A class for tracking if 0MQ is done using one or more messages.
  16. When you send a 0MQ message, it is not sent immediately. The 0MQ IO thread
  17. sends the message at some later time. Often you want to know when 0MQ has
  18. actually sent the message though. This is complicated by the fact that
  19. a single 0MQ message can be sent multiple times using different sockets.
  20. This class allows you to track all of the 0MQ usages of a message.
  21. Parameters
  22. ----------
  23. towatch : Event, MessageTracker, Message instances.
  24. This objects to track. This class can track the low-level
  25. Events used by the Message class, other MessageTrackers or
  26. actual Messages.
  27. """
  28. events = None
  29. peers = None
  30. def __init__(self, *towatch):
  31. """MessageTracker(*towatch)
  32. Create a message tracker to track a set of mesages.
  33. Parameters
  34. ----------
  35. *towatch : tuple of Event, MessageTracker, Message instances.
  36. This list of objects to track. This class can track the low-level
  37. Events used by the Message class, other MessageTrackers or
  38. actual Messages.
  39. """
  40. self.events = set()
  41. self.peers = set()
  42. for obj in towatch:
  43. if isinstance(obj, Event):
  44. self.events.add(obj)
  45. elif isinstance(obj, MessageTracker):
  46. self.peers.add(obj)
  47. elif isinstance(obj, Frame):
  48. if not obj.tracker:
  49. raise ValueError("Not a tracked message")
  50. self.peers.add(obj.tracker)
  51. else:
  52. raise TypeError("Require Events or Message Frames, not %s"%type(obj))
  53. @property
  54. def done(self):
  55. """Is 0MQ completely done with the message(s) being tracked?"""
  56. for evt in self.events:
  57. if not evt.is_set():
  58. return False
  59. for pm in self.peers:
  60. if not pm.done:
  61. return False
  62. return True
  63. def wait(self, timeout=-1):
  64. """mt.wait(timeout=-1)
  65. Wait for 0MQ to be done with the message or until `timeout`.
  66. Parameters
  67. ----------
  68. timeout : float [default: -1, wait forever]
  69. Maximum time in (s) to wait before raising NotDone.
  70. Returns
  71. -------
  72. None
  73. if done before `timeout`
  74. Raises
  75. ------
  76. NotDone
  77. if `timeout` reached before I am done.
  78. """
  79. tic = time.time()
  80. if timeout is False or timeout < 0:
  81. remaining = 3600*24*7 # a week
  82. else:
  83. remaining = timeout
  84. done = False
  85. for evt in self.events:
  86. if remaining < 0:
  87. raise NotDone
  88. evt.wait(timeout=remaining)
  89. if not evt.is_set():
  90. raise NotDone
  91. toc = time.time()
  92. remaining -= (toc-tic)
  93. tic = toc
  94. for peer in self.peers:
  95. if remaining < 0:
  96. raise NotDone
  97. peer.wait(timeout=remaining)
  98. toc = time.time()
  99. remaining -= (toc-tic)
  100. tic = toc
  101. _FINISHED_TRACKER = MessageTracker()
  102. __all__ = ['MessageTracker', '_FINISHED_TRACKER']