util.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. from __future__ import absolute_import
  2. import atexit
  3. import binascii
  4. import collections
  5. import struct
  6. from threading import Thread, Event
  7. import weakref
  8. from kafka.vendor import six
  9. from kafka.errors import BufferUnderflowError
  10. def crc32(data):
  11. crc = binascii.crc32(data)
  12. # py2 and py3 behave a little differently
  13. # CRC is encoded as a signed int in kafka protocol
  14. # so we'll convert the py3 unsigned result to signed
  15. if six.PY3 and crc >= 2**31:
  16. crc -= 2**32
  17. return crc
  18. def write_int_string(s):
  19. if s is not None and not isinstance(s, six.binary_type):
  20. raise TypeError('Expected "%s" to be bytes\n'
  21. 'data=%s' % (type(s), repr(s)))
  22. if s is None:
  23. return struct.pack('>i', -1)
  24. else:
  25. return struct.pack('>i%ds' % len(s), len(s), s)
  26. def read_short_string(data, cur):
  27. if len(data) < cur + 2:
  28. raise BufferUnderflowError("Not enough data left")
  29. (strlen,) = struct.unpack('>h', data[cur:cur + 2])
  30. if strlen == -1:
  31. return None, cur + 2
  32. cur += 2
  33. if len(data) < cur + strlen:
  34. raise BufferUnderflowError("Not enough data left")
  35. out = data[cur:cur + strlen]
  36. return out, cur + strlen
  37. def relative_unpack(fmt, data, cur):
  38. size = struct.calcsize(fmt)
  39. if len(data) < cur + size:
  40. raise BufferUnderflowError("Not enough data left")
  41. out = struct.unpack(fmt, data[cur:cur + size])
  42. return out, cur + size
  43. def group_by_topic_and_partition(tuples):
  44. out = collections.defaultdict(dict)
  45. for t in tuples:
  46. assert t.topic not in out or t.partition not in out[t.topic], \
  47. 'Duplicate {0}s for {1} {2}'.format(t.__class__.__name__,
  48. t.topic, t.partition)
  49. out[t.topic][t.partition] = t
  50. return out
  51. class ReentrantTimer(object):
  52. """
  53. A timer that can be restarted, unlike threading.Timer
  54. (although this uses threading.Timer)
  55. Arguments:
  56. t: timer interval in milliseconds
  57. fn: a callable to invoke
  58. args: tuple of args to be passed to function
  59. kwargs: keyword arguments to be passed to function
  60. """
  61. def __init__(self, t, fn, *args, **kwargs):
  62. if t <= 0:
  63. raise ValueError('Invalid timeout value')
  64. if not callable(fn):
  65. raise ValueError('fn must be callable')
  66. self.thread = None
  67. self.t = t / 1000.0
  68. self.fn = fn
  69. self.args = args
  70. self.kwargs = kwargs
  71. self.active = None
  72. def _timer(self, active):
  73. # python2.6 Event.wait() always returns None
  74. # python2.7 and greater returns the flag value (true/false)
  75. # we want the flag value, so add an 'or' here for python2.6
  76. # this is redundant for later python versions (FLAG OR FLAG == FLAG)
  77. while not (active.wait(self.t) or active.is_set()):
  78. self.fn(*self.args, **self.kwargs)
  79. def start(self):
  80. if self.thread is not None:
  81. self.stop()
  82. self.active = Event()
  83. self.thread = Thread(target=self._timer, args=(self.active,))
  84. self.thread.daemon = True # So the app exits when main thread exits
  85. self.thread.start()
  86. def stop(self):
  87. if self.thread is None:
  88. return
  89. self.active.set()
  90. self.thread.join(self.t + 1)
  91. # noinspection PyAttributeOutsideInit
  92. self.timer = None
  93. self.fn = None
  94. def __del__(self):
  95. self.stop()
  96. class WeakMethod(object):
  97. """
  98. Callable that weakly references a method and the object it is bound to. It
  99. is based on http://stackoverflow.com/a/24287465.
  100. Arguments:
  101. object_dot_method: A bound instance method (i.e. 'object.method').
  102. """
  103. def __init__(self, object_dot_method):
  104. try:
  105. self.target = weakref.ref(object_dot_method.__self__)
  106. except AttributeError:
  107. self.target = weakref.ref(object_dot_method.im_self)
  108. self._target_id = id(self.target())
  109. try:
  110. self.method = weakref.ref(object_dot_method.__func__)
  111. except AttributeError:
  112. self.method = weakref.ref(object_dot_method.im_func)
  113. self._method_id = id(self.method())
  114. def __call__(self, *args, **kwargs):
  115. """
  116. Calls the method on target with args and kwargs.
  117. """
  118. return self.method()(self.target(), *args, **kwargs)
  119. def __hash__(self):
  120. return hash(self.target) ^ hash(self.method)
  121. def __eq__(self, other):
  122. if not isinstance(other, WeakMethod):
  123. return False
  124. return self._target_id == other._target_id and self._method_id == other._method_id
  125. def try_method_on_system_exit(obj, method, *args, **kwargs):
  126. def wrapper(_obj, _meth, *args, **kwargs):
  127. try:
  128. getattr(_obj, _meth)(*args, **kwargs)
  129. except (ReferenceError, AttributeError):
  130. pass
  131. atexit.register(wrapper, weakref.proxy(obj), method, *args, **kwargs)