limiter.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. """ Basic Rate-Limiter
  4. """
  5. import logging
  6. import sys
  7. import os
  8. from kombu.five import monotonic
  9. from .exceptions import BucketFullException, InvalidParams
  10. from .limit_context_decorator import LimitContextDecorator
  11. logger = logging.getLogger(__name__)
  12. class Limiter(object):
  13. """Basic rate-limiter class that makes use of built-in python Queue"""
  14. def __init__(
  15. self,
  16. bucket_class,
  17. bucket_kwargs = None,
  18. *rates):
  19. """Init a limiter with rates and specific bucket type
  20. - Bucket type can be any class that extends AbstractBucket
  21. - 3 kinds of Bucket are provided, being MemoryQueueBucket, MemoryListBucket and RedisBucket
  22. - Opts is extra keyword-arguements for Bucket class constructor
  23. """
  24. self._validate_rate_list(rates)
  25. self._rates = rates
  26. self._bkclass = bucket_class
  27. self._bucket_args = bucket_kwargs or {}
  28. self.bucket_group = {}
  29. monotonic()
  30. def _validate_rate_list(self, rates): # pylint: disable=no-self-use
  31. if not rates:
  32. raise InvalidParams("Rate(s) must be provided")
  33. for idx, rate in enumerate(rates[1:]):
  34. prev_rate = rates[idx]
  35. invalid = rate.limit <= prev_rate.limit or rate.interval <= prev_rate.interval
  36. if invalid:
  37. msg = "{prev_rate} cannot come before {rate}".format(prev_rate = prev_rate, rate = rate)
  38. raise InvalidParams(msg)
  39. def _init_buckets(self, identities):
  40. """Setup Queue for each Identity if needed
  41. Queue's maxsize equals the max limit of request-rates
  42. """
  43. if len(self.bucket_group.keys()) > 100000:
  44. self.bucket_group = {}
  45. for item_id in identities:
  46. if not self.bucket_group.get(item_id):
  47. maxsize = self._rates[-1].limit
  48. self.bucket_group[item_id] = self._bkclass(
  49. maxsize = maxsize,
  50. identity = item_id,
  51. **self._bucket_args
  52. )
  53. def try_acquire(self, *identities):
  54. """Acquiring an item or reject it if rate-limit has been exceeded"""
  55. self._init_buckets(identities)
  56. now = monotonic()
  57. for idx, rate in enumerate(self._rates):
  58. for item_id in identities:
  59. bucket = self.bucket_group[item_id]
  60. volume = bucket.size()
  61. if volume < rate.limit:
  62. continue
  63. # Determine rate's time-window starting point
  64. start_time = now - rate.interval
  65. item_count, remaining_time = bucket.inspect_expired_items(start_time)
  66. if item_count >= rate.limit:
  67. raise BucketFullException(item_id, rate, remaining_time)
  68. if idx == len(self._rates) - 1:
  69. # We remove item based on the request-rate with the max-limit
  70. bucket.get(volume - item_count)
  71. for item_id in identities:
  72. self.bucket_group[item_id].put(now)
  73. logger.debug('pid = {}; now bucket size is: {}; bucket group len = {}'.format(
  74. os.getpid(), sys.getsizeof(self.bucket_group), len(self.bucket_group)))
  75. def ratelimit(
  76. self,
  77. delay = False,
  78. max_delay = None,
  79. *identities):
  80. """A decorator and contextmanager that applies rate-limiting, with async support.
  81. Depending on arguments, calls that exceed the rate limit will either raise an exception, or
  82. sleep until space is available in the bucket.
  83. Args:
  84. identities: Bucket identities
  85. delay: Delay until the next request instead of raising an exception
  86. max_delay: The maximum allowed delay time (in seconds); anything over this will raise
  87. an exception
  88. """
  89. return LimitContextDecorator(self, *identities, delay = delay, max_delay = max_delay)
  90. def get_current_volume(self, identity):
  91. """Get current bucket volume for a specific identity"""
  92. bucket = self.bucket_group[identity]
  93. return bucket.size()