values.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import unicode_literals
  2. import os
  3. from threading import Lock
  4. from .mmap_dict import mmap_key, MmapedDict
  5. class MutexValue(object):
  6. """A float protected by a mutex."""
  7. _multiprocess = False
  8. def __init__(self, typ, metric_name, name, labelnames, labelvalues, **kwargs):
  9. self._value = 0.0
  10. self._lock = Lock()
  11. def inc(self, amount):
  12. with self._lock:
  13. self._value += amount
  14. def set(self, value):
  15. with self._lock:
  16. self._value = value
  17. def get(self):
  18. with self._lock:
  19. return self._value
  20. def MultiProcessValue(process_identifier=os.getpid):
  21. """Returns a MmapedValue class based on a process_identifier function.
  22. The 'process_identifier' function MUST comply with this simple rule:
  23. when called in simultaneously running processes it MUST return distinct values.
  24. Using a different function than the default 'os.getpid' is at your own risk.
  25. """
  26. files = {}
  27. values = []
  28. pid = {'value': process_identifier()}
  29. # Use a single global lock when in multi-processing mode
  30. # as we presume this means there is no threading going on.
  31. # This avoids the need to also have mutexes in __MmapDict.
  32. lock = Lock()
  33. class MmapedValue(object):
  34. """A float protected by a mutex backed by a per-process mmaped file."""
  35. _multiprocess = True
  36. def __init__(self, typ, metric_name, name, labelnames, labelvalues, multiprocess_mode='', **kwargs):
  37. self._params = typ, metric_name, name, labelnames, labelvalues, multiprocess_mode
  38. with lock:
  39. self.__check_for_pid_change()
  40. self.__reset()
  41. values.append(self)
  42. def __reset(self):
  43. typ, metric_name, name, labelnames, labelvalues, multiprocess_mode = self._params
  44. if typ == 'gauge':
  45. file_prefix = typ + '_' + multiprocess_mode
  46. else:
  47. file_prefix = typ
  48. if file_prefix not in files:
  49. filename = os.path.join(
  50. os.environ['prometheus_multiproc_dir'],
  51. '{0}_{1}.db'.format(file_prefix, pid['value']))
  52. files[file_prefix] = MmapedDict(filename)
  53. self._file = files[file_prefix]
  54. self._key = mmap_key(metric_name, name, labelnames, labelvalues)
  55. self._value = self._file.read_value(self._key)
  56. def __check_for_pid_change(self):
  57. actual_pid = process_identifier()
  58. if pid['value'] != actual_pid:
  59. pid['value'] = actual_pid
  60. # There has been a fork(), reset all the values.
  61. for f in files.values():
  62. f.close()
  63. files.clear()
  64. for value in values:
  65. value.__reset()
  66. def inc(self, amount):
  67. with lock:
  68. self.__check_for_pid_change()
  69. self._value += amount
  70. self._file.write_value(self._key, self._value)
  71. def set(self, value):
  72. with lock:
  73. self.__check_for_pid_change()
  74. self._value = value
  75. self._file.write_value(self._key, self._value)
  76. def get(self):
  77. with lock:
  78. self.__check_for_pid_change()
  79. return self._value
  80. return MmapedValue
  81. def get_value_class():
  82. # Should we enable multi-process mode?
  83. # This needs to be chosen before the first metric is constructed,
  84. # and as that may be in some arbitrary library the user/admin has
  85. # no control over we use an environment variable.
  86. if 'prometheus_multiproc_dir' in os.environ:
  87. return MultiProcessValue()
  88. else:
  89. return MutexValue
  90. ValueClass = get_value_class()