1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374 |
- from __future__ import absolute_import
- from kafka.metrics import AnonMeasurable, NamedMeasurable
- from kafka.metrics.compound_stat import AbstractCompoundStat
- from kafka.metrics.stats import Histogram
- from kafka.metrics.stats.sampled_stat import AbstractSampledStat
- class BucketSizing(object):
- CONSTANT = 0
- LINEAR = 1
- class Percentiles(AbstractSampledStat, AbstractCompoundStat):
- """A compound stat that reports one or more percentiles"""
- def __init__(self, size_in_bytes, bucketing, max_val, min_val=0.0,
- percentiles=None):
- super(Percentiles, self).__init__(0.0)
- self._percentiles = percentiles or []
- self._buckets = int(size_in_bytes / 4)
- if bucketing == BucketSizing.CONSTANT:
- self._bin_scheme = Histogram.ConstantBinScheme(self._buckets,
- min_val, max_val)
- elif bucketing == BucketSizing.LINEAR:
- if min_val != 0.0:
- raise ValueError('Linear bucket sizing requires min_val'
- ' to be 0.0.')
- self.bin_scheme = Histogram.LinearBinScheme(self._buckets, max_val)
- else:
- ValueError('Unknown bucket type: %s' % bucketing)
- def stats(self):
- measurables = []
- def make_measure_fn(pct):
- return lambda config, now: self.value(config, now,
- pct / 100.0)
- for percentile in self._percentiles:
- measure_fn = make_measure_fn(percentile.percentile)
- stat = NamedMeasurable(percentile.name, AnonMeasurable(measure_fn))
- measurables.append(stat)
- return measurables
- def value(self, config, now, quantile):
- self.purge_obsolete_samples(config, now)
- count = sum(sample.event_count for sample in self._samples)
- if count == 0.0:
- return float('NaN')
- sum_val = 0.0
- quant = float(quantile)
- for b in range(self._buckets):
- for sample in self._samples:
- assert type(sample) is self.HistogramSample
- hist = sample.histogram.counts
- sum_val += hist[b]
- if sum_val / count > quant:
- return self._bin_scheme.from_bin(b)
- return float('inf')
- def combine(self, samples, config, now):
- return self.value(config, now, 0.5)
- def new_sample(self, time_ms):
- return Percentiles.HistogramSample(self._bin_scheme, time_ms)
- def update(self, sample, config, value, time_ms):
- assert type(sample) is self.HistogramSample
- sample.histogram.record(value)
- class HistogramSample(AbstractSampledStat.Sample):
- def __init__(self, scheme, now):
- super(Percentiles.HistogramSample, self).__init__(0.0, now)
- self.histogram = Histogram(scheme)
|