registry.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import copy
  2. from threading import Lock
  3. from .metrics_core import Metric
  4. class CollectorRegistry(object):
  5. """Metric collector registry.
  6. Collectors must have a no-argument method 'collect' that returns a list of
  7. Metric objects. The returned metrics should be consistent with the Prometheus
  8. exposition formats.
  9. """
  10. def __init__(self, auto_describe=False, target_info=None):
  11. self._collector_to_names = {}
  12. self._names_to_collectors = {}
  13. self._auto_describe = auto_describe
  14. self._lock = Lock()
  15. self._target_info = {}
  16. self.set_target_info(target_info)
  17. def register(self, collector):
  18. """Add a collector to the registry."""
  19. with self._lock:
  20. names = self._get_names(collector)
  21. duplicates = set(self._names_to_collectors).intersection(names)
  22. if duplicates:
  23. raise ValueError(
  24. 'Duplicated timeseries in CollectorRegistry: {0}'.format(
  25. duplicates))
  26. for name in names:
  27. self._names_to_collectors[name] = collector
  28. self._collector_to_names[collector] = names
  29. def unregister(self, collector):
  30. """Remove a collector from the registry."""
  31. with self._lock:
  32. for name in self._collector_to_names[collector]:
  33. del self._names_to_collectors[name]
  34. del self._collector_to_names[collector]
  35. def _get_names(self, collector):
  36. """Get names of timeseries the collector produces."""
  37. desc_func = None
  38. # If there's a describe function, use it.
  39. try:
  40. desc_func = collector.describe
  41. except AttributeError:
  42. pass
  43. # Otherwise, if auto describe is enabled use the collect function.
  44. if not desc_func and self._auto_describe:
  45. desc_func = collector.collect
  46. if not desc_func:
  47. return []
  48. result = []
  49. type_suffixes = {
  50. 'counter': ['_total', '_created'],
  51. 'summary': ['', '_sum', '_count', '_created'],
  52. 'histogram': ['_bucket', '_sum', '_count', '_created'],
  53. 'gaugehistogram': ['_bucket', '_gsum', '_gcount'],
  54. 'info': ['_info'],
  55. }
  56. for metric in desc_func():
  57. for suffix in type_suffixes.get(metric.type, ['']):
  58. result.append(metric.name + suffix)
  59. return result
  60. def collect(self):
  61. """Yields metrics from the collectors in the registry."""
  62. collectors = None
  63. ti = None
  64. with self._lock:
  65. collectors = copy.copy(self._collector_to_names)
  66. if self._target_info:
  67. ti = self._target_info_metric()
  68. if ti:
  69. yield ti
  70. for collector in collectors:
  71. for metric in collector.collect():
  72. yield metric
  73. def restricted_registry(self, names):
  74. """Returns object that only collects some metrics.
  75. Returns an object which upon collect() will return
  76. only samples with the given names.
  77. Intended usage is:
  78. generate_latest(REGISTRY.restricted_registry(['a_timeseries']))
  79. Experimental."""
  80. names = set(names)
  81. collectors = set()
  82. metrics = []
  83. with self._lock:
  84. if 'target_info' in names and self._target_info:
  85. metrics.append(self._target_info_metric())
  86. names.remove('target_info')
  87. for name in names:
  88. if name in self._names_to_collectors:
  89. collectors.add(self._names_to_collectors[name])
  90. for collector in collectors:
  91. for metric in collector.collect():
  92. samples = [s for s in metric.samples if s[0] in names]
  93. if samples:
  94. m = Metric(metric.name, metric.documentation, metric.type)
  95. m.samples = samples
  96. metrics.append(m)
  97. class RestrictedRegistry(object):
  98. def collect(self):
  99. return metrics
  100. return RestrictedRegistry()
  101. def set_target_info(self, labels):
  102. with self._lock:
  103. if labels:
  104. if not self._target_info and 'target_info' in self._names_to_collectors:
  105. raise ValueError('CollectorRegistry already contains a target_info metric')
  106. self._names_to_collectors['target_info'] = None
  107. elif self._target_info:
  108. self._names_to_collectors.pop('target_info', None)
  109. self._target_info = labels
  110. def get_target_info(self):
  111. with self._lock:
  112. return self._target_info
  113. def _target_info_metric(self):
  114. m = Metric('target', 'Target metadata', 'info')
  115. m.add_sample('target_info', self._target_info, 1)
  116. return m
  117. def get_sample_value(self, name, labels=None):
  118. """Returns the sample value, or None if not found.
  119. This is inefficient, and intended only for use in unittests.
  120. """
  121. if labels is None:
  122. labels = {}
  123. for metric in self.collect():
  124. for s in metric.samples:
  125. if s.name == name and s.labels == labels:
  126. return s.value
  127. return None
  128. REGISTRY = CollectorRegistry(auto_describe=True)