multiprocess.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. from __future__ import unicode_literals
  2. from collections import defaultdict
  3. import glob
  4. import json
  5. import os
  6. from .metrics_core import Metric
  7. from .mmap_dict import MmapedDict
  8. from .samples import Sample
  9. from .utils import floatToGoString
  10. try: # Python3
  11. FileNotFoundError
  12. except NameError: # Python >= 2.5
  13. FileNotFoundError = IOError
  14. MP_METRIC_HELP = 'Multiprocess metric'
  15. class MultiProcessCollector(object):
  16. """Collector for files for multi-process mode."""
  17. def __init__(self, registry, path=None):
  18. if path is None:
  19. path = os.environ.get('prometheus_multiproc_dir')
  20. if not path or not os.path.isdir(path):
  21. raise ValueError('env prometheus_multiproc_dir is not set or not a directory')
  22. self._path = path
  23. if registry:
  24. registry.register(self)
  25. @staticmethod
  26. def merge(files, accumulate=True):
  27. """Merge metrics from given mmap files.
  28. By default, histograms are accumulated, as per prometheus wire format.
  29. But if writing the merged data back to mmap files, use
  30. accumulate=False to avoid compound accumulation.
  31. """
  32. metrics = MultiProcessCollector._read_metrics(files)
  33. return MultiProcessCollector._accumulate_metrics(metrics, accumulate)
  34. @staticmethod
  35. def _read_metrics(files):
  36. metrics = {}
  37. key_cache = {}
  38. def _parse_key(key):
  39. val = key_cache.get(key)
  40. if not val:
  41. metric_name, name, labels = json.loads(key)
  42. labels_key = tuple(sorted(labels.items()))
  43. val = key_cache[key] = (metric_name, name, labels, labels_key)
  44. return val
  45. for f in files:
  46. parts = os.path.basename(f).split('_')
  47. typ = parts[0]
  48. try:
  49. file_values = MmapedDict.read_all_values_from_file(f)
  50. except FileNotFoundError:
  51. if typ == 'gauge' and parts[1] in ('liveall', 'livesum'):
  52. # Those files can disappear between the glob of collect
  53. # and now (via a mark_process_dead call) so don't fail if
  54. # the file is missing
  55. continue
  56. raise
  57. for key, value, pos in file_values:
  58. metric_name, name, labels, labels_key = _parse_key(key)
  59. metric = metrics.get(metric_name)
  60. if metric is None:
  61. metric = Metric(metric_name, MP_METRIC_HELP, typ)
  62. metrics[metric_name] = metric
  63. if typ == 'gauge':
  64. pid = parts[2][:-3]
  65. metric._multiprocess_mode = parts[1]
  66. metric.add_sample(name, labels_key + (('pid', pid),), value)
  67. else:
  68. # The duplicates and labels are fixed in the next for.
  69. metric.add_sample(name, labels_key, value)
  70. return metrics
  71. @staticmethod
  72. def _accumulate_metrics(metrics, accumulate):
  73. for metric in metrics.values():
  74. samples = defaultdict(float)
  75. buckets = defaultdict(lambda: defaultdict(float))
  76. samples_setdefault = samples.setdefault
  77. for s in metric.samples:
  78. name, labels, value, timestamp, exemplar = s
  79. if metric.type == 'gauge':
  80. without_pid_key = (name, tuple([l for l in labels if l[0] != 'pid']))
  81. if metric._multiprocess_mode == 'min':
  82. current = samples_setdefault(without_pid_key, value)
  83. if value < current:
  84. samples[without_pid_key] = value
  85. elif metric._multiprocess_mode == 'max':
  86. current = samples_setdefault(without_pid_key, value)
  87. if value > current:
  88. samples[without_pid_key] = value
  89. elif metric._multiprocess_mode == 'livesum':
  90. samples[without_pid_key] += value
  91. else: # all/liveall
  92. samples[(name, labels)] = value
  93. elif metric.type == 'histogram':
  94. # A for loop with early exit is faster than a genexpr
  95. # or a listcomp that ends up building unnecessary things
  96. for l in labels:
  97. if l[0] == 'le':
  98. bucket_value = float(l[1])
  99. # _bucket
  100. without_le = tuple(l for l in labels if l[0] != 'le')
  101. buckets[without_le][bucket_value] += value
  102. break
  103. else: # did not find the `le` key
  104. # _sum/_count
  105. samples[(name, labels)] += value
  106. else:
  107. # Counter and Summary.
  108. samples[(name, labels)] += value
  109. # Accumulate bucket values.
  110. if metric.type == 'histogram':
  111. for labels, values in buckets.items():
  112. acc = 0.0
  113. for bucket, value in sorted(values.items()):
  114. sample_key = (
  115. metric.name + '_bucket',
  116. labels + (('le', floatToGoString(bucket)),),
  117. )
  118. if accumulate:
  119. acc += value
  120. samples[sample_key] = acc
  121. else:
  122. samples[sample_key] = value
  123. if accumulate:
  124. samples[(metric.name + '_count', labels)] = acc
  125. # Convert to correct sample format.
  126. metric.samples = [Sample(name_, dict(labels), value) for (name_, labels), value in samples.items()]
  127. return metrics.values()
  128. def collect(self):
  129. files = glob.glob(os.path.join(self._path, '*.db'))
  130. return self.merge(files, accumulate=True)
  131. def mark_process_dead(pid, path=None):
  132. """Do bookkeeping for when one process dies in a multi-process setup."""
  133. if path is None:
  134. path = os.environ.get('prometheus_multiproc_dir')
  135. for f in glob.glob(os.path.join(path, 'gauge_livesum_{0}.db'.format(pid))):
  136. os.remove(f)
  137. for f in glob.glob(os.path.join(path, 'gauge_liveall_{0}.db'.format(pid))):
  138. os.remove(f)