123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- from __future__ import unicode_literals
- import base64
- from contextlib import closing
- import os
- import socket
- import sys
- import threading
- from wsgiref.simple_server import make_server, WSGIServer, WSGIRequestHandler
- from .openmetrics import exposition as openmetrics
- from .registry import REGISTRY
- from .utils import floatToGoString
- try:
- from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
- from SocketServer import ThreadingMixIn
- from urllib2 import build_opener, Request, HTTPHandler
- from urllib import quote_plus
- from urlparse import parse_qs, urlparse
- except ImportError:
- # Python 3
- from http.server import BaseHTTPRequestHandler, HTTPServer
- from socketserver import ThreadingMixIn
- from urllib.request import build_opener, Request, HTTPHandler
- from urllib.parse import quote_plus, parse_qs, urlparse
- CONTENT_TYPE_LATEST = str('text/plain; version=0.0.4; charset=utf-8')
- """Content type of the latest text format"""
- PYTHON26_OR_OLDER = sys.version_info < (2, 7)
- PYTHON376_OR_NEWER = sys.version_info > (3, 7, 5)
- def _bake_output(registry, accept_header, params):
- """Bake output for metrics output."""
- encoder, content_type = choose_encoder(accept_header)
- if 'name[]' in params:
- registry = registry.restricted_registry(params['name[]'])
- output = encoder(registry)
- return str('200 OK'), (str('Content-Type'), content_type), output
- def make_wsgi_app(registry=REGISTRY):
- """Create a WSGI app which serves the metrics from a registry."""
- def prometheus_app(environ, start_response):
- # Prepare parameters
- accept_header = environ.get('HTTP_ACCEPT')
- params = parse_qs(environ.get('QUERY_STRING', ''))
- # Bake output
- status, header, output = _bake_output(registry, accept_header, params)
- # Return output
- start_response(status, [header])
- return [output]
- return prometheus_app
- class _SilentHandler(WSGIRequestHandler):
- """WSGI handler that does not log requests."""
- def log_message(self, format, *args):
- """Log nothing."""
- class ThreadingWSGIServer(ThreadingMixIn, WSGIServer):
- """Thread per request HTTP server."""
- # Make worker threads "fire and forget". Beginning with Python 3.7 this
- # prevents a memory leak because ``ThreadingMixIn`` starts to gather all
- # non-daemon threads in a list in order to join on them at server close.
- daemon_threads = True
- def start_wsgi_server(port, addr='', registry=REGISTRY):
- """Starts a WSGI server for prometheus metrics as a daemon thread."""
- app = make_wsgi_app(registry)
- httpd = make_server(addr, port, app, ThreadingWSGIServer, handler_class=_SilentHandler)
- t = threading.Thread(target=httpd.serve_forever)
- t.daemon = True
- t.start()
- start_http_server = start_wsgi_server
- def generate_latest(registry=REGISTRY):
- """Returns the metrics from the registry in latest text format as a string."""
- def sample_line(line):
- if line.labels:
- labelstr = '{{{0}}}'.format(','.join(
- ['{0}="{1}"'.format(
- k, v.replace('\\', r'\\').replace('\n', r'\n').replace('"', r'\"'))
- for k, v in sorted(line.labels.items())]))
- else:
- labelstr = ''
- timestamp = ''
- if line.timestamp is not None:
- # Convert to milliseconds.
- timestamp = ' {0:d}'.format(int(float(line.timestamp) * 1000))
- return '{0}{1} {2}{3}\n'.format(
- line.name, labelstr, floatToGoString(line.value), timestamp)
- output = []
- for metric in registry.collect():
- try:
- mname = metric.name
- mtype = metric.type
- # Munging from OpenMetrics into Prometheus format.
- if mtype == 'counter':
- mname = mname + '_total'
- elif mtype == 'info':
- mname = mname + '_info'
- mtype = 'gauge'
- elif mtype == 'stateset':
- mtype = 'gauge'
- elif mtype == 'gaugehistogram':
- # A gauge histogram is really a gauge,
- # but this captures the structure better.
- mtype = 'histogram'
- elif mtype == 'unknown':
- mtype = 'untyped'
- output.append('# HELP {0} {1}\n'.format(
- mname, metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
- output.append('# TYPE {0} {1}\n'.format(mname, mtype))
- om_samples = {}
- for s in metric.samples:
- for suffix in ['_created', '_gsum', '_gcount']:
- if s.name == metric.name + suffix:
- # OpenMetrics specific sample, put in a gauge at the end.
- om_samples.setdefault(suffix, []).append(sample_line(s))
- break
- else:
- output.append(sample_line(s))
- except Exception as exception:
- exception.args = (exception.args or ('',)) + (metric,)
- raise
- for suffix, lines in sorted(om_samples.items()):
- output.append('# HELP {0}{1} {2}\n'.format(metric.name, suffix,
- metric.documentation.replace('\\', r'\\').replace('\n', r'\n')))
- output.append('# TYPE {0}{1} gauge\n'.format(metric.name, suffix))
- output.extend(lines)
- return ''.join(output).encode('utf-8')
- def choose_encoder(accept_header):
- accept_header = accept_header or ''
- for accepted in accept_header.split(','):
- if accepted.split(';')[0].strip() == 'application/openmetrics-text':
- return (openmetrics.generate_latest,
- openmetrics.CONTENT_TYPE_LATEST)
- return generate_latest, CONTENT_TYPE_LATEST
- class MetricsHandler(BaseHTTPRequestHandler):
- """HTTP handler that gives metrics from ``REGISTRY``."""
- registry = REGISTRY
- def do_GET(self):
- # Prepare parameters
- registry = self.registry
- accept_header = self.headers.get('Accept')
- params = parse_qs(urlparse(self.path).query)
- # Bake output
- status, header, output = _bake_output(registry, accept_header, params)
- # Return output
- self.send_response(int(status.split(' ')[0]))
- self.send_header(*header)
- self.end_headers()
- self.wfile.write(output)
- def log_message(self, format, *args):
- """Log nothing."""
- @classmethod
- def factory(cls, registry):
- """Returns a dynamic MetricsHandler class tied
- to the passed registry.
- """
- # This implementation relies on MetricsHandler.registry
- # (defined above and defaulted to REGISTRY).
- # As we have unicode_literals, we need to create a str()
- # object for type().
- cls_name = str(cls.__name__)
- MyMetricsHandler = type(cls_name, (cls, object),
- {"registry": registry})
- return MyMetricsHandler
- def write_to_textfile(path, registry):
- """Write metrics to the given path.
- This is intended for use with the Node exporter textfile collector.
- The path must end in .prom for the textfile collector to process it."""
- tmppath = '%s.%s.%s' % (path, os.getpid(), threading.current_thread().ident)
- with open(tmppath, 'wb') as f:
- f.write(generate_latest(registry))
- # rename(2) is atomic.
- os.rename(tmppath, path)
- def default_handler(url, method, timeout, headers, data):
- """Default handler that implements HTTP/HTTPS connections.
- Used by the push_to_gateway functions. Can be re-used by other handlers."""
- def handle():
- request = Request(url, data=data)
- request.get_method = lambda: method
- for k, v in headers:
- request.add_header(k, v)
- resp = build_opener(HTTPHandler).open(request, timeout=timeout)
- if resp.code >= 400:
- raise IOError("error talking to pushgateway: {0} {1}".format(
- resp.code, resp.msg))
- return handle
- def basic_auth_handler(url, method, timeout, headers, data, username=None, password=None):
- """Handler that implements HTTP/HTTPS connections with Basic Auth.
- Sets auth headers using supplied 'username' and 'password', if set.
- Used by the push_to_gateway functions. Can be re-used by other handlers."""
- def handle():
- """Handler that implements HTTP Basic Auth.
- """
- if username is not None and password is not None:
- auth_value = '{0}:{1}'.format(username, password).encode('utf-8')
- auth_token = base64.b64encode(auth_value)
- auth_header = b'Basic ' + auth_token
- headers.append(['Authorization', auth_header])
- default_handler(url, method, timeout, headers, data)()
- return handle
- def push_to_gateway(
- gateway, job, registry, grouping_key=None, timeout=30,
- handler=default_handler):
- """Push metrics to the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `registry` is an instance of CollectorRegistry
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long push will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- If not None, the argument must be a function which accepts
- the following arguments:
- url, method, timeout, headers, and content
- May be used to implement additional functionality not
- supported by the built-in default handler (such as SSL
- client certicates, and HTTP authentication mechanisms).
- 'url' is the URL for the request, the 'gateway' argument
- described earlier will form the basis of this URL.
- 'method' is the HTTP method which should be used when
- carrying out the request.
- 'timeout' requests not successfully completed after this
- many seconds should be aborted. If timeout is None, then
- the handler should not set a timeout.
- 'headers' is a list of ("header-name","header-value") tuples
- which must be passed to the pushgateway in the form of HTTP
- request headers.
- The function should raise an exception (e.g. IOError) on
- failure.
- 'content' is the data which should be used to form the HTTP
- Message Body.
- This overwrites all metrics with the same job and grouping_key.
- This uses the PUT HTTP method."""
- _use_gateway('PUT', gateway, job, registry, grouping_key, timeout, handler)
- def pushadd_to_gateway(
- gateway, job, registry, grouping_key=None, timeout=30,
- handler=default_handler):
- """PushAdd metrics to the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `registry` is an instance of CollectorRegistry
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long push will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- See the 'prometheus_client.push_to_gateway' documentation
- for implementation requirements.
- This replaces metrics with the same name, job and grouping_key.
- This uses the POST HTTP method."""
- _use_gateway('POST', gateway, job, registry, grouping_key, timeout, handler)
- def delete_from_gateway(
- gateway, job, grouping_key=None, timeout=30, handler=default_handler):
- """Delete metrics from the given pushgateway.
- `gateway` the url for your push gateway. Either of the form
- 'http://pushgateway.local', or 'pushgateway.local'.
- Scheme defaults to 'http' if none is provided
- `job` is the job label to be attached to all pushed metrics
- `grouping_key` please see the pushgateway documentation for details.
- Defaults to None
- `timeout` is how long delete will attempt to connect before giving up.
- Defaults to 30s, can be set to None for no timeout.
- `handler` is an optional function which can be provided to perform
- requests to the 'gateway'.
- Defaults to None, in which case an http or https request
- will be carried out by a default handler.
- See the 'prometheus_client.push_to_gateway' documentation
- for implementation requirements.
- This deletes metrics with the given job and grouping_key.
- This uses the DELETE HTTP method."""
- _use_gateway('DELETE', gateway, job, None, grouping_key, timeout, handler)
- def _use_gateway(method, gateway, job, registry, grouping_key, timeout, handler):
- gateway_url = urlparse(gateway)
- # See https://bugs.python.org/issue27657 for details on urlparse in py>=3.7.6.
- if not gateway_url.scheme or (
- (PYTHON376_OR_NEWER or PYTHON26_OR_OLDER)
- and gateway_url.scheme not in ['http', 'https']
- ):
- gateway = 'http://{0}'.format(gateway)
- url = '{0}/metrics/{1}/{2}'.format(gateway, *_escape_grouping_key("job", job))
- data = b''
- if method != 'DELETE':
- data = generate_latest(registry)
- if grouping_key is None:
- grouping_key = {}
- url += ''.join(
- '/{0}/{1}'.format(*_escape_grouping_key(str(k), str(v)))
- for k, v in sorted(grouping_key.items()))
- handler(
- url=url, method=method, timeout=timeout,
- headers=[('Content-Type', CONTENT_TYPE_LATEST)], data=data,
- )()
- def _escape_grouping_key(k, v):
- if v == "" :
- # Per https://github.com/prometheus/pushgateway/pull/346.
- return k + "@base64", "="
- elif '/' in v:
- # Added in Pushgateway 0.9.0.
- return k + "@base64", base64.urlsafe_b64encode(v.encode("utf-8")).decode("utf-8")
- else:
- return k, quote_plus(v)
- def instance_ip_grouping_key():
- """Grouping key with instance set to the IP Address of this host."""
- with closing(socket.socket(socket.AF_INET, socket.SOCK_DGRAM)) as s:
- s.connect(('localhost', 0))
- return {'instance': s.getsockname()[0]}
- try:
- # Python >3.5 only
- from .asgi import make_asgi_app
- except:
- pass
|