bucket.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import abc
  4. import copy
  5. from Queue import Queue
  6. from abc import abstractmethod
  7. from threading import RLock
  8. import six
  9. from .exceptions import InvalidParams
  10. class AbstractBucket(six.with_metaclass(abc.ABCMeta)):
  11. """Documentation for AbstractBucket"""
  12. def __init__(self, maxsize = 0, **_kwargs):
  13. self._maxsize = maxsize
  14. def maxsize(self):
  15. """Return the maximum size of the bucket,
  16. ie the maxinum number of item this bucket can hold
  17. """
  18. return self._maxsize
  19. @abstractmethod
  20. def size(self):
  21. """Return the current size of the bucket,
  22. ie the count of all items currently in the bucket
  23. """
  24. @abstractmethod
  25. def put(self, item):
  26. """Put an item (typically the current time) in the bucket
  27. Return 1 if successful, else 0
  28. """
  29. @abstractmethod
  30. def get(self, number):
  31. """Get items and remove them from the bucket in the FIFO fashion
  32. Return the number of items that have been removed
  33. """
  34. @abstractmethod
  35. def all_items(self):
  36. """Return a list as copies of all items in the bucket"""
  37. def inspect_expired_items(self, time):
  38. """Find how many items in bucket that have slipped out of the time-window"""
  39. volume = self.size()
  40. item_count, remaining_time = 0, 0
  41. for log_idx, log_item in enumerate(self.all_items()):
  42. if log_item > time:
  43. item_count = volume - log_idx
  44. remaining_time = log_item - time
  45. break
  46. return item_count, remaining_time
  47. class MemoryQueueBucket(AbstractBucket):
  48. """A bucket that resides in memory
  49. using python's built-in Queue class
  50. """
  51. def __init__(self, maxsize = 0, **kwargs):
  52. super(MemoryQueueBucket, self).__init__()
  53. self._q = Queue(maxsize = maxsize)
  54. def size(self):
  55. return self._q.qsize()
  56. def put(self, item):
  57. return self._q.put(item)
  58. def get(self, number):
  59. counter = 0
  60. for _ in range(number):
  61. self._q.get()
  62. counter += 1
  63. return counter
  64. def all_items(self):
  65. return list(self._q.queue)
  66. class MemoryListBucket(AbstractBucket):
  67. """A bucket that resides in memory
  68. using python's List
  69. """
  70. def __init__(self, maxsize = 0, **kwargs):
  71. super(MemoryListBucket, self).__init__(maxsize = maxsize)
  72. self._q = []
  73. self._lock = RLock()
  74. def size(self):
  75. return len(self._q)
  76. def put(self, item):
  77. with self._lock:
  78. if self.size() < self.maxsize():
  79. self._q.append(item)
  80. return 1
  81. return 0
  82. def get(self, number):
  83. with self._lock:
  84. counter = 0
  85. for _ in range(number):
  86. self._q.pop(0)
  87. counter += 1
  88. return counter
  89. def all_items(self):
  90. return copy.copy(self._q)
  91. class RedisBucket(AbstractBucket):
  92. """A bucket with Redis
  93. using List
  94. """
  95. def __init__(
  96. self,
  97. maxsize = 0,
  98. redis_pool = None,
  99. bucket_name = None,
  100. identity = None,
  101. **kwargs
  102. ):
  103. super(RedisBucket, self).__init__(maxsize = maxsize)
  104. if not bucket_name or not isinstance(bucket_name, str):
  105. msg = "keyword argument bucket-name is missing: a distict name is required"
  106. raise InvalidParams(msg)
  107. self._pool = redis_pool
  108. self._bucket_name = "{bucket_name}___{identity}".format(bucket_name = bucket_name, identity = identity)
  109. def get_connection(self):
  110. """Obtain a connection from redis pool"""
  111. from redis import Redis # pylint: disable=import-outside-toplevel
  112. return Redis(connection_pool = self._pool)
  113. def get_pipeline(self):
  114. """Using redis pipeline for batch operation"""
  115. conn = self.get_connection()
  116. pipeline = conn.pipeline()
  117. return pipeline
  118. def size(self):
  119. conn = self.get_connection()
  120. return conn.llen(self._bucket_name)
  121. def put(self, item):
  122. conn = self.get_connection()
  123. current_size = conn.llen(self._bucket_name)
  124. if current_size < self.maxsize():
  125. conn.rpush(self._bucket_name, item)
  126. return 1
  127. return 0
  128. def get(self, number):
  129. pipeline = self.get_pipeline()
  130. counter = 0
  131. for _ in range(number):
  132. pipeline.lpop(self._bucket_name)
  133. counter += 1
  134. pipeline.execute()
  135. return counter
  136. def all_items(self):
  137. conn = self.get_connection()
  138. items = conn.lrange(self._bucket_name, 0, -1)
  139. return [float(i.decode("utf-8")) for i in items]