dict_reporter.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. from __future__ import absolute_import
  2. import logging
  3. import threading
  4. from kafka.metrics.metrics_reporter import AbstractMetricsReporter
  5. logger = logging.getLogger(__name__)
  6. class DictReporter(AbstractMetricsReporter):
  7. """A basic dictionary based metrics reporter.
  8. Store all metrics in a two level dictionary of category > name > metric.
  9. """
  10. def __init__(self, prefix=''):
  11. self._lock = threading.Lock()
  12. self._prefix = prefix if prefix else '' # never allow None
  13. self._store = {}
  14. def snapshot(self):
  15. """
  16. Return a nested dictionary snapshot of all metrics and their
  17. values at this time. Example:
  18. {
  19. 'category': {
  20. 'metric1_name': 42.0,
  21. 'metric2_name': 'foo'
  22. }
  23. }
  24. """
  25. return dict((category, dict((name, metric.value())
  26. for name, metric in list(metrics.items())))
  27. for category, metrics in
  28. list(self._store.items()))
  29. def init(self, metrics):
  30. for metric in metrics:
  31. self.metric_change(metric)
  32. def metric_change(self, metric):
  33. with self._lock:
  34. category = self.get_category(metric)
  35. if category not in self._store:
  36. self._store[category] = {}
  37. self._store[category][metric.metric_name.name] = metric
  38. def metric_removal(self, metric):
  39. with self._lock:
  40. category = self.get_category(metric)
  41. metrics = self._store.get(category, {})
  42. removed = metrics.pop(metric.metric_name.name, None)
  43. if not metrics:
  44. self._store.pop(category, None)
  45. return removed
  46. def get_category(self, metric):
  47. """
  48. Return a string category for the metric.
  49. The category is made up of this reporter's prefix and the
  50. metric's group and tags.
  51. Examples:
  52. prefix = 'foo', group = 'bar', tags = {'a': 1, 'b': 2}
  53. returns: 'foo.bar.a=1,b=2'
  54. prefix = 'foo', group = 'bar', tags = None
  55. returns: 'foo.bar'
  56. prefix = None, group = 'bar', tags = None
  57. returns: 'bar'
  58. """
  59. tags = ','.join('%s=%s' % (k, v) for k, v in
  60. sorted(metric.metric_name.tags.items()))
  61. return '.'.join(x for x in
  62. [self._prefix, metric.metric_name.group, tags] if x)
  63. def configure(self, configs):
  64. pass
  65. def close(self):
  66. pass