graphite.py 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. #!/usr/bin/python
  2. from __future__ import unicode_literals
  3. import logging
  4. import re
  5. import socket
  6. import threading
  7. import time
  8. from timeit import default_timer
  9. from ..registry import REGISTRY
  10. # Roughly, have to keep to what works as a file name.
  11. # We also remove periods, so labels can be distinguished.
  12. _INVALID_GRAPHITE_CHARS = re.compile(r"[^a-zA-Z0-9_-]")
  13. def _sanitize(s):
  14. return _INVALID_GRAPHITE_CHARS.sub('_', s)
  15. class _RegularPush(threading.Thread):
  16. def __init__(self, pusher, interval, prefix):
  17. super(_RegularPush, self).__init__()
  18. self._pusher = pusher
  19. self._interval = interval
  20. self._prefix = prefix
  21. def run(self):
  22. wait_until = default_timer()
  23. while True:
  24. while True:
  25. now = default_timer()
  26. if now >= wait_until:
  27. # May need to skip some pushes.
  28. while wait_until < now:
  29. wait_until += self._interval
  30. break
  31. # time.sleep can return early.
  32. time.sleep(wait_until - now)
  33. try:
  34. self._pusher.push(prefix=self._prefix)
  35. except IOError:
  36. logging.exception("Push failed")
  37. class GraphiteBridge(object):
  38. def __init__(self, address, registry=REGISTRY, timeout_seconds=30, _timer=time.time):
  39. self._address = address
  40. self._registry = registry
  41. self._timeout = timeout_seconds
  42. self._timer = _timer
  43. def push(self, prefix=''):
  44. now = int(self._timer())
  45. output = []
  46. prefixstr = ''
  47. if prefix:
  48. prefixstr = prefix + '.'
  49. for metric in self._registry.collect():
  50. for s in metric.samples:
  51. if s.labels:
  52. labelstr = '.' + '.'.join(
  53. ['{0}.{1}'.format(
  54. _sanitize(k), _sanitize(v))
  55. for k, v in sorted(s.labels.items())])
  56. else:
  57. labelstr = ''
  58. output.append('{0}{1}{2} {3} {4}\n'.format(
  59. prefixstr, _sanitize(s.name), labelstr, float(s.value), now))
  60. conn = socket.create_connection(self._address, self._timeout)
  61. conn.sendall(''.join(output).encode('ascii'))
  62. conn.close()
  63. def start(self, interval=60.0, prefix=''):
  64. t = _RegularPush(self, interval, prefix)
  65. t.daemon = True
  66. t.start()