123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232 |
- from __future__ import unicode_literals
- import re
- from .metrics_core import Metric
- from .samples import Sample
- try:
- import StringIO
- except ImportError:
- # Python 3
- import io as StringIO
- def text_string_to_metric_families(text):
- """Parse Prometheus text format from a unicode string.
- See text_fd_to_metric_families.
- """
- for metric_family in text_fd_to_metric_families(StringIO.StringIO(text)):
- yield metric_family
- ESCAPE_SEQUENCES = {
- '\\\\': '\\',
- '\\n': '\n',
- '\\"': '"',
- }
- def replace_escape_sequence(match):
- return ESCAPE_SEQUENCES[match.group(0)]
- HELP_ESCAPING_RE = re.compile(r'\\[\\n]')
- ESCAPING_RE = re.compile(r'\\[\\n"]')
- def _replace_help_escaping(s):
- return HELP_ESCAPING_RE.sub(replace_escape_sequence, s)
- def _replace_escaping(s):
- return ESCAPING_RE.sub(replace_escape_sequence, s)
- def _is_character_escaped(s, charpos):
- num_bslashes = 0
- while (charpos > num_bslashes and
- s[charpos - 1 - num_bslashes] == '\\'):
- num_bslashes += 1
- return num_bslashes % 2 == 1
- def _parse_labels(labels_string):
- labels = {}
- # Return if we don't have valid labels
- if "=" not in labels_string:
- return labels
- escaping = False
- if "\\" in labels_string:
- escaping = True
- # Copy original labels
- sub_labels = labels_string
- try:
- # Process one label at a time
- while sub_labels:
- # The label name is before the equal
- value_start = sub_labels.index("=")
- label_name = sub_labels[:value_start]
- sub_labels = sub_labels[value_start + 1:].lstrip()
- # Find the first quote after the equal
- quote_start = sub_labels.index('"') + 1
- value_substr = sub_labels[quote_start:]
- # Find the last unescaped quote
- i = 0
- while i < len(value_substr):
- i = value_substr.index('"', i)
- if not _is_character_escaped(value_substr, i):
- break
- i += 1
- # The label value is between the first and last quote
- quote_end = i + 1
- label_value = sub_labels[quote_start:quote_end]
- # Replace escaping if needed
- if escaping:
- label_value = _replace_escaping(label_value)
- labels[label_name.strip()] = label_value
- # Remove the processed label from the sub-slice for next iteration
- sub_labels = sub_labels[quote_end + 1:]
- next_comma = sub_labels.find(",") + 1
- sub_labels = sub_labels[next_comma:].lstrip()
- return labels
- except ValueError:
- raise ValueError("Invalid labels: %s" % labels_string)
- # If we have multiple values only consider the first
- def _parse_value_and_timestamp(s):
- s = s.lstrip()
- separator = " "
- if separator not in s:
- separator = "\t"
- values = [value.strip() for value in s.split(separator) if value.strip()]
- if not values:
- return float(s), None
- value = float(values[0])
- timestamp = (float(values[-1])/1000) if len(values) > 1 else None
- return value, timestamp
- def _parse_sample(text):
- # Detect the labels in the text
- try:
- label_start, label_end = text.index("{"), text.rindex("}")
- # The name is before the labels
- name = text[:label_start].strip()
- # We ignore the starting curly brace
- label = text[label_start + 1:label_end]
- # The value is after the label end (ignoring curly brace and space)
- value, timestamp = _parse_value_and_timestamp(text[label_end + 2:])
- return Sample(name, _parse_labels(label), value, timestamp)
- # We don't have labels
- except ValueError:
- # Detect what separator is used
- separator = " "
- if separator not in text:
- separator = "\t"
- name_end = text.index(separator)
- name = text[:name_end]
- # The value is after the name
- value, timestamp = _parse_value_and_timestamp(text[name_end:])
- return Sample(name, {}, value, timestamp)
- def text_fd_to_metric_families(fd):
- """Parse Prometheus text format from a file descriptor.
- This is a laxer parser than the main Go parser,
- so successful parsing does not imply that the parsed
- text meets the specification.
- Yields Metric's.
- """
- name = ''
- documentation = ''
- typ = 'untyped'
- samples = []
- allowed_names = []
- def build_metric(name, documentation, typ, samples):
- # Munge counters into OpenMetrics representation
- # used internally.
- if typ == 'counter':
- if name.endswith('_total'):
- name = name[:-6]
- else:
- new_samples = []
- for s in samples:
- new_samples.append(Sample(s[0] + '_total', *s[1:]))
- samples = new_samples
- metric = Metric(name, documentation, typ)
- metric.samples = samples
- return metric
- for line in fd:
- line = line.strip()
- if line.startswith('#'):
- parts = line.split(None, 3)
- if len(parts) < 2:
- continue
- if parts[1] == 'HELP':
- if parts[2] != name:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric
- name = parts[2]
- typ = 'untyped'
- samples = []
- allowed_names = [parts[2]]
- if len(parts) == 4:
- documentation = _replace_help_escaping(parts[3])
- else:
- documentation = ''
- elif parts[1] == 'TYPE':
- if parts[2] != name:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric
- name = parts[2]
- documentation = ''
- samples = []
- typ = parts[3]
- allowed_names = {
- 'counter': [''],
- 'gauge': [''],
- 'summary': ['_count', '_sum', ''],
- 'histogram': ['_count', '_sum', '_bucket'],
- }.get(typ, [''])
- allowed_names = [name + n for n in allowed_names]
- else:
- # Ignore other comment tokens
- pass
- elif line == '':
- # Ignore blank lines
- pass
- else:
- sample = _parse_sample(line)
- if sample.name not in allowed_names:
- if name != '':
- yield build_metric(name, documentation, typ, samples)
- # New metric, yield immediately as untyped singleton
- name = ''
- documentation = ''
- typ = 'untyped'
- samples = []
- allowed_names = []
- yield build_metric(sample[0], documentation, typ, [sample])
- else:
- samples.append(sample)
- if name != '':
- yield build_metric(name, documentation, typ, samples)
|