sensor.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134
  1. from __future__ import absolute_import
  2. import threading
  3. import time
  4. from kafka.errors import QuotaViolationError
  5. from kafka.metrics import KafkaMetric
  6. class Sensor(object):
  7. """
  8. A sensor applies a continuous sequence of numerical values
  9. to a set of associated metrics. For example a sensor on
  10. message size would record a sequence of message sizes using
  11. the `record(double)` api and would maintain a set
  12. of metrics about request sizes such as the average or max.
  13. """
  14. def __init__(self, registry, name, parents, config,
  15. inactive_sensor_expiration_time_seconds):
  16. if not name:
  17. raise ValueError('name must be non-empty')
  18. self._lock = threading.RLock()
  19. self._registry = registry
  20. self._name = name
  21. self._parents = parents or []
  22. self._metrics = []
  23. self._stats = []
  24. self._config = config
  25. self._inactive_sensor_expiration_time_ms = (
  26. inactive_sensor_expiration_time_seconds * 1000)
  27. self._last_record_time = time.time() * 1000
  28. self._check_forest(set())
  29. def _check_forest(self, sensors):
  30. """Validate that this sensor doesn't end up referencing itself."""
  31. if self in sensors:
  32. raise ValueError('Circular dependency in sensors: %s is its own'
  33. 'parent.' % self.name)
  34. sensors.add(self)
  35. for parent in self._parents:
  36. parent._check_forest(sensors)
  37. @property
  38. def name(self):
  39. """
  40. The name this sensor is registered with.
  41. This name will be unique among all registered sensors.
  42. """
  43. return self._name
  44. @property
  45. def metrics(self):
  46. return tuple(self._metrics)
  47. def record(self, value=1.0, time_ms=None):
  48. """
  49. Record a value at a known time.
  50. Arguments:
  51. value (double): The value we are recording
  52. time_ms (int): A POSIX timestamp in milliseconds.
  53. Default: The time when record() is evaluated (now)
  54. Raises:
  55. QuotaViolationException: if recording this value moves a
  56. metric beyond its configured maximum or minimum bound
  57. """
  58. if time_ms is None:
  59. time_ms = time.time() * 1000
  60. self._last_record_time = time_ms
  61. with self._lock: # XXX high volume, might be performance issue
  62. # increment all the stats
  63. for stat in self._stats:
  64. stat.record(self._config, value, time_ms)
  65. self._check_quotas(time_ms)
  66. for parent in self._parents:
  67. parent.record(value, time_ms)
  68. def _check_quotas(self, time_ms):
  69. """
  70. Check if we have violated our quota for any metric that
  71. has a configured quota
  72. """
  73. for metric in self._metrics:
  74. if metric.config and metric.config.quota:
  75. value = metric.value(time_ms)
  76. if not metric.config.quota.is_acceptable(value):
  77. raise QuotaViolationError("'%s' violated quota. Actual: "
  78. "%d, Threshold: %d" %
  79. (metric.metric_name,
  80. value,
  81. metric.config.quota.bound))
  82. def add_compound(self, compound_stat, config=None):
  83. """
  84. Register a compound statistic with this sensor which
  85. yields multiple measurable quantities (like a histogram)
  86. Arguments:
  87. stat (AbstractCompoundStat): The stat to register
  88. config (MetricConfig): The configuration for this stat.
  89. If None then the stat will use the default configuration
  90. for this sensor.
  91. """
  92. if not compound_stat:
  93. raise ValueError('compound stat must be non-empty')
  94. self._stats.append(compound_stat)
  95. for named_measurable in compound_stat.stats():
  96. metric = KafkaMetric(named_measurable.name, named_measurable.stat,
  97. config or self._config)
  98. self._registry.register_metric(metric)
  99. self._metrics.append(metric)
  100. def add(self, metric_name, stat, config=None):
  101. """
  102. Register a metric with this sensor
  103. Arguments:
  104. metric_name (MetricName): The name of the metric
  105. stat (AbstractMeasurableStat): The statistic to keep
  106. config (MetricConfig): A special configuration for this metric.
  107. If None use the sensor default configuration.
  108. """
  109. with self._lock:
  110. metric = KafkaMetric(metric_name, stat, config or self._config)
  111. self._registry.register_metric(metric)
  112. self._metrics.append(metric)
  113. self._stats.append(stat)
  114. def has_expired(self):
  115. """
  116. Return True if the Sensor is eligible for removal due to inactivity.
  117. """
  118. return ((time.time() * 1000 - self._last_record_time) >
  119. self._inactive_sensor_expiration_time_ms)