percentiles.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374
  1. from __future__ import absolute_import
  2. from kafka.metrics import AnonMeasurable, NamedMeasurable
  3. from kafka.metrics.compound_stat import AbstractCompoundStat
  4. from kafka.metrics.stats import Histogram
  5. from kafka.metrics.stats.sampled_stat import AbstractSampledStat
  6. class BucketSizing(object):
  7. CONSTANT = 0
  8. LINEAR = 1
  9. class Percentiles(AbstractSampledStat, AbstractCompoundStat):
  10. """A compound stat that reports one or more percentiles"""
  11. def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
  12. percentiles=None):
  13. super(Percentiles, self).__init__(0.0)
  14. self._percentiles = percentiles or []
  15. self._buckets = int(size_in_bytes / 4)
  16. if bucketing == BucketSizing.CONSTANT:
  17. self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
  18. min_val, max_val)
  19. elif bucketing == BucketSizing.LINEAR:
  20. if min_val != 0.0:
  21. raise ValueError('Linear bucket sizing requires min_val'
  22. ' to be 0.0.')
  23. self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
  24. else:
  25. ValueError('Unknown bucket type: %s' % bucketing)
  26. def stats(self):
  27. measurables = []
  28. def make_measure_fn(pct):
  29. return lambda config, now: self.value(config, now,
  30. pct / 100.0)
  31. for percentile in self._percentiles:
  32. measure_fn = make_measure_fn(percentile.percentile)
  33. stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
  34. measurables.append(stat)
  35. return measurables
  36. def value(self, config, now, quantile):
  37. self.purge_obsolete_samples(config, now)
  38. count = sum(sample.event_count for sample in self._samples)
  39. if count == 0.0:
  40. return float('NaN')
  41. sum_val = 0.0
  42. quant = float(quantile)
  43. for b in range(self._buckets):
  44. for sample in self._samples:
  45. assert type(sample) is self.HistogramSample
  46. hist = sample.histogram.counts
  47. sum_val += hist[b]
  48. if sum_val / count > quant:
  49. return self._bin_scheme.from_bin(b)
  50. return float('inf')
  51. def combine(self, samples, config, now):
  52. return self.value(config, now, 0.5)
  53. def new_sample(self, time_ms):
  54. return Percentiles.HistogramSample(self._bin_scheme, time_ms)
  55. def update(self, sample, config, value, time_ms):
  56. assert type(sample) is self.HistogramSample
  57. sample.histogram.record(value)
  58. class HistogramSample(AbstractSampledStat.Sample):
  59. def __init__(self, scheme, now):
  60. super(Percentiles.HistogramSample, self).__init__(0.0, now)
  61. self.histogram = Histogram(scheme)