sampled_stat.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. from __future__ import absolute_import
  2. import abc
  3. from kafka.metrics.measurable_stat import AbstractMeasurableStat
  4. class AbstractSampledStat(AbstractMeasurableStat):
  5. """
  6. An AbstractSampledStat records a single scalar value measured over
  7. one or more samples. Each sample is recorded over a configurable
  8. window. The window can be defined by number of events or elapsed
  9. time (or both, if both are given the window is complete when
  10. *either* the event count or elapsed time criterion is met).
  11. All the samples are combined to produce the measurement. When a
  12. window is complete the oldest sample is cleared and recycled to
  13. begin recording the next sample.
  14. Subclasses of this class define different statistics measured
  15. using this basic pattern.
  16. """
  17. __metaclass__ = abc.ABCMeta
  18. def __init__(self, initial_value):
  19. self._initial_value = initial_value
  20. self._samples = []
  21. self._current = 0
  22. @abc.abstractmethod
  23. def update(self, sample, config, value, time_ms):
  24. raise NotImplementedError
  25. @abc.abstractmethod
  26. def combine(self, samples, config, now):
  27. raise NotImplementedError
  28. def record(self, config, value, time_ms):
  29. sample = self.current(time_ms)
  30. if sample.is_complete(time_ms, config):
  31. sample = self._advance(config, time_ms)
  32. self.update(sample, config, float(value), time_ms)
  33. sample.event_count += 1
  34. def new_sample(self, time_ms):
  35. return self.Sample(self._initial_value, time_ms)
  36. def measure(self, config, now):
  37. self.purge_obsolete_samples(config, now)
  38. return float(self.combine(self._samples, config, now))
  39. def current(self, time_ms):
  40. if not self._samples:
  41. self._samples.append(self.new_sample(time_ms))
  42. return self._samples[self._current]
  43. def oldest(self, now):
  44. if not self._samples:
  45. self._samples.append(self.new_sample(now))
  46. oldest = self._samples[0]
  47. for sample in self._samples[1:]:
  48. if sample.last_window_ms < oldest.last_window_ms:
  49. oldest = sample
  50. return oldest
  51. def purge_obsolete_samples(self, config, now):
  52. """
  53. Timeout any windows that have expired in the absence of any events
  54. """
  55. expire_age = config.samples * config.time_window_ms
  56. for sample in self._samples:
  57. if now - sample.last_window_ms >= expire_age:
  58. sample.reset(now)
  59. def _advance(self, config, time_ms):
  60. self._current = (self._current + 1) % config.samples
  61. if self._current >= len(self._samples):
  62. sample = self.new_sample(time_ms)
  63. self._samples.append(sample)
  64. return sample
  65. else:
  66. sample = self.current(time_ms)
  67. sample.reset(time_ms)
  68. return sample
  69. class Sample(object):
  70. def __init__(self, initial_value, now):
  71. self.initial_value = initial_value
  72. self.event_count = 0
  73. self.last_window_ms = now
  74. self.value = initial_value
  75. def reset(self, now):
  76. self.event_count = 0
  77. self.last_window_ms = now
  78. self.value = self.initial_value
  79. def is_complete(self, time_ms, config):
  80. return (time_ms - self.last_window_ms >= config.time_window_ms or
  81. self.event_count >= config.event_window)