123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """ Basic Rate-Limiter
- """
- import logging
- import sys
- import os
- from kombu.five import monotonic
- from .exceptions import BucketFullException, InvalidParams
- from .limit_context_decorator import LimitContextDecorator
- logger = logging.getLogger(__name__)
- class Limiter(object):
- """Basic rate-limiter class that makes use of built-in python Queue"""
- def __init__(
- self,
- bucket_class,
- bucket_kwargs = None,
- *rates):
- """Init a limiter with rates and specific bucket type
- - Bucket type can be any class that extends AbstractBucket
- - 3 kinds of Bucket are provided, being MemoryQueueBucket, MemoryListBucket and RedisBucket
- - Opts is extra keyword-arguements for Bucket class constructor
- """
- self._validate_rate_list(rates)
- self._rates = rates
- self._bkclass = bucket_class
- self._bucket_args = bucket_kwargs or {}
- self.bucket_group = {}
- monotonic()
- def _validate_rate_list(self, rates): # pylint: disable=no-self-use
- if not rates:
- raise InvalidParams("Rate(s) must be provided")
- for idx, rate in enumerate(rates[1:]):
- prev_rate = rates[idx]
- invalid = rate.limit <= prev_rate.limit or rate.interval <= prev_rate.interval
- if invalid:
- msg = "{prev_rate} cannot come before {rate}".format(prev_rate = prev_rate, rate = rate)
- raise InvalidParams(msg)
- def _init_buckets(self, identities):
- """Setup Queue for each Identity if needed
- Queue's maxsize equals the max limit of request-rates
- """
- if len(self.bucket_group.keys()) > 100000:
- self.bucket_group = {}
- for item_id in identities:
- if not self.bucket_group.get(item_id):
- maxsize = self._rates[-1].limit
- self.bucket_group[item_id] = self._bkclass(
- maxsize = maxsize,
- identity = item_id,
- **self._bucket_args
- )
- def try_acquire(self, *identities):
- """Acquiring an item or reject it if rate-limit has been exceeded"""
- self._init_buckets(identities)
- now = monotonic()
- for idx, rate in enumerate(self._rates):
- for item_id in identities:
- bucket = self.bucket_group[item_id]
- volume = bucket.size()
- if volume < rate.limit:
- continue
- # Determine rate's time-window starting point
- start_time = now - rate.interval
- item_count, remaining_time = bucket.inspect_expired_items(start_time)
- if item_count >= rate.limit:
- raise BucketFullException(item_id, rate, remaining_time)
- if idx == len(self._rates) - 1:
- # We remove item based on the request-rate with the max-limit
- bucket.get(volume - item_count)
- for item_id in identities:
- self.bucket_group[item_id].put(now)
- logger.debug('pid = {}; now bucket size is: {}; bucket group len = {}'.format(
- os.getpid(), sys.getsizeof(self.bucket_group), len(self.bucket_group)))
- def ratelimit(
- self,
- delay = False,
- max_delay = None,
- *identities):
- """A decorator and contextmanager that applies rate-limiting, with async support.
- Depending on arguments, calls that exceed the rate limit will either raise an exception, or
- sleep until space is available in the bucket.
- Args:
- identities: Bucket identities
- delay: Delay until the next request instead of raising an exception
- max_delay: The maximum allowed delay time (in seconds); anything over this will raise
- an exception
- """
- return LimitContextDecorator(self, *identities, delay = delay, max_delay = max_delay)
- def get_current_volume(self, identity):
- """Get current bucket volume for a specific identity"""
- bucket = self.bucket_group[identity]
- return bucket.size()
|