asgi.py 1.2 KB

12345678910111213141516171819202122232425262728293031323334
  1. from urllib.parse import parse_qs
  2. from .exposition import _bake_output
  3. from .registry import REGISTRY
  4. def make_asgi_app(registry=REGISTRY):
  5. """Create a ASGI app which serves the metrics from a registry."""
  6. async def prometheus_app(scope, receive, send):
  7. assert scope.get("type") == "http"
  8. # Prepare parameters
  9. params = parse_qs(scope.get('query_string', b''))
  10. accept_header = "Accept: " + ",".join([
  11. value.decode("utf8") for (name, value) in scope.get('headers')
  12. if name.decode("utf8") == 'accept'
  13. ])
  14. # Bake output
  15. status, header, output = _bake_output(registry, accept_header, params)
  16. # Return output
  17. payload = await receive()
  18. if payload.get("type") == "http.request":
  19. await send(
  20. {
  21. "type": "http.response.start",
  22. "status": int(status.split(' ')[0]),
  23. "headers": [
  24. tuple(x.encode('utf8') for x in header)
  25. ]
  26. }
  27. )
  28. await send({"type": "http.response.body", "body": output})
  29. return prometheus_app