123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134 |
- from __future__ import absolute_import
- import threading
- import time
- from kafka.errors import QuotaViolationError
- from kafka.metrics import KafkaMetric
- class Sensor(object):
- """
- A sensor applies a continuous sequence of numerical values
- to a set of associated metrics. For example a sensor on
- message size would record a sequence of message sizes using
- the `record(double)` api and would maintain a set
- of metrics about request sizes such as the average or max.
- """
- def __init__(self, registry, name, parents, config,
- inactive_sensor_expiration_time_seconds):
- if not name:
- raise ValueError('name must be non-empty')
- self._lock = threading.RLock()
- self._registry = registry
- self._name = name
- self._parents = parents or []
- self._metrics = []
- self._stats = []
- self._config = config
- self._inactive_sensor_expiration_time_ms = (
- inactive_sensor_expiration_time_seconds * 1000)
- self._last_record_time = time.time() * 1000
- self._check_forest(set())
- def _check_forest(self, sensors):
- """Validate that this sensor doesn't end up referencing itself."""
- if self in sensors:
- raise ValueError('Circular dependency in sensors: %s is its own'
- 'parent.' % self.name)
- sensors.add(self)
- for parent in self._parents:
- parent._check_forest(sensors)
- @property
- def name(self):
- """
- The name this sensor is registered with.
- This name will be unique among all registered sensors.
- """
- return self._name
- @property
- def metrics(self):
- return tuple(self._metrics)
- def record(self, value=1.0, time_ms=None):
- """
- Record a value at a known time.
- Arguments:
- value (double): The value we are recording
- time_ms (int): A POSIX timestamp in milliseconds.
- Default: The time when record() is evaluated (now)
- Raises:
- QuotaViolationException: if recording this value moves a
- metric beyond its configured maximum or minimum bound
- """
- if time_ms is None:
- time_ms = time.time() * 1000
- self._last_record_time = time_ms
- with self._lock: # XXX high volume, might be performance issue
- # increment all the stats
- for stat in self._stats:
- stat.record(self._config, value, time_ms)
- self._check_quotas(time_ms)
- for parent in self._parents:
- parent.record(value, time_ms)
- def _check_quotas(self, time_ms):
- """
- Check if we have violated our quota for any metric that
- has a configured quota
- """
- for metric in self._metrics:
- if metric.config and metric.config.quota:
- value = metric.value(time_ms)
- if not metric.config.quota.is_acceptable(value):
- raise QuotaViolationError("'%s' violated quota. Actual: "
- "%d, Threshold: %d" %
- (metric.metric_name,
- value,
- metric.config.quota.bound))
- def add_compound(self, compound_stat, config=None):
- """
- Register a compound statistic with this sensor which
- yields multiple measurable quantities (like a histogram)
- Arguments:
- stat (AbstractCompoundStat): The stat to register
- config (MetricConfig): The configuration for this stat.
- If None then the stat will use the default configuration
- for this sensor.
- """
- if not compound_stat:
- raise ValueError('compound stat must be non-empty')
- self._stats.append(compound_stat)
- for named_measurable in compound_stat.stats():
- metric = KafkaMetric(named_measurable.name, named_measurable.stat,
- config or self._config)
- self._registry.register_metric(metric)
- self._metrics.append(metric)
- def add(self, metric_name, stat, config=None):
- """
- Register a metric with this sensor
- Arguments:
- metric_name (MetricName): The name of the metric
- stat (AbstractMeasurableStat): The statistic to keep
- config (MetricConfig): A special configuration for this metric.
- If None use the sensor default configuration.
- """
- with self._lock:
- metric = KafkaMetric(metric_name, stat, config or self._config)
- self._registry.register_metric(metric)
- self._metrics.append(metric)
- self._stats.append(stat)
- def has_expired(self):
- """
- Return True if the Sensor is eligible for removal due to inactivity.
- """
- return ((time.time() * 1000 - self._last_record_time) >
- self._inactive_sensor_expiration_time_ms)
|