rate.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
  1. from __future__ import absolute_import
  2. from kafka.metrics.measurable_stat import AbstractMeasurableStat
  3. from kafka.metrics.stats.sampled_stat import AbstractSampledStat
  4. class TimeUnit(object):
  5. _names = {
  6. 'nanosecond': 0,
  7. 'microsecond': 1,
  8. 'millisecond': 2,
  9. 'second': 3,
  10. 'minute': 4,
  11. 'hour': 5,
  12. 'day': 6,
  13. }
  14. NANOSECONDS = _names['nanosecond']
  15. MICROSECONDS = _names['microsecond']
  16. MILLISECONDS = _names['millisecond']
  17. SECONDS = _names['second']
  18. MINUTES = _names['minute']
  19. HOURS = _names['hour']
  20. DAYS = _names['day']
  21. @staticmethod
  22. def get_name(time_unit):
  23. return TimeUnit._names[time_unit]
  24. class Rate(AbstractMeasurableStat):
  25. """
  26. The rate of the given quantity. By default this is the total observed
  27. over a set of samples from a sampled statistic divided by the elapsed
  28. time over the sample windows. Alternative AbstractSampledStat
  29. implementations can be provided, however, to record the rate of
  30. occurrences (e.g. the count of values measured over the time interval)
  31. or other such values.
  32. """
  33. def __init__(self, time_unit=TimeUnit.SECONDS, sampled_stat=None):
  34. self._stat = sampled_stat or SampledTotal()
  35. self._unit = time_unit
  36. def unit_name(self):
  37. return TimeUnit.get_name(self._unit)
  38. def record(self, config, value, time_ms):
  39. self._stat.record(config, value, time_ms)
  40. def measure(self, config, now):
  41. value = self._stat.measure(config, now)
  42. return float(value) / self.convert(self.window_size(config, now))
  43. def window_size(self, config, now):
  44. # purge old samples before we compute the window size
  45. self._stat.purge_obsolete_samples(config, now)
  46. """
  47. Here we check the total amount of time elapsed since the oldest
  48. non-obsolete window. This give the total window_size of the batch
  49. which is the time used for Rate computation. However, there is
  50. an issue if we do not have sufficient data for e.g. if only
  51. 1 second has elapsed in a 30 second window, the measured rate
  52. will be very high. Hence we assume that the elapsed time is
  53. always N-1 complete windows plus whatever fraction of the final
  54. window is complete.
  55. Note that we could simply count the amount of time elapsed in
  56. the current window and add n-1 windows to get the total time,
  57. but this approach does not account for sleeps. AbstractSampledStat
  58. only creates samples whenever record is called, if no record is
  59. called for a period of time that time is not accounted for in
  60. window_size and produces incorrect results.
  61. """
  62. total_elapsed_time_ms = now - self._stat.oldest(now).last_window_ms
  63. # Check how many full windows of data we have currently retained
  64. num_full_windows = int(total_elapsed_time_ms / config.time_window_ms)
  65. min_full_windows = config.samples - 1
  66. # If the available windows are less than the minimum required,
  67. # add the difference to the totalElapsedTime
  68. if num_full_windows < min_full_windows:
  69. total_elapsed_time_ms += ((min_full_windows - num_full_windows) *
  70. config.time_window_ms)
  71. return total_elapsed_time_ms
  72. def convert(self, time_ms):
  73. if self._unit == TimeUnit.NANOSECONDS:
  74. return time_ms * 1000.0 * 1000.0
  75. elif self._unit == TimeUnit.MICROSECONDS:
  76. return time_ms * 1000.0
  77. elif self._unit == TimeUnit.MILLISECONDS:
  78. return time_ms
  79. elif self._unit == TimeUnit.SECONDS:
  80. return time_ms / 1000.0
  81. elif self._unit == TimeUnit.MINUTES:
  82. return time_ms / (60.0 * 1000.0)
  83. elif self._unit == TimeUnit.HOURS:
  84. return time_ms / (60.0 * 60.0 * 1000.0)
  85. elif self._unit == TimeUnit.DAYS:
  86. return time_ms / (24.0 * 60.0 * 60.0 * 1000.0)
  87. else:
  88. raise ValueError('Unknown unit: %s' % self._unit)
  89. class SampledTotal(AbstractSampledStat):
  90. def __init__(self, initial_value=None):
  91. if initial_value is not None:
  92. raise ValueError('initial_value cannot be set on SampledTotal')
  93. super(SampledTotal, self).__init__(0.0)
  94. def update(self, sample, config, value, time_ms):
  95. sample.value += value
  96. def combine(self, samples, config, now):
  97. return float(sum(sample.value for sample in samples))