# -*- coding: utf-8 -*- # !/usr/bin/env python """ Basic Rate-Limiter """ import logging import sys import os from kombu.five import monotonic from .exceptions import BucketFullException, InvalidParams from .limit_context_decorator import LimitContextDecorator logger = logging.getLogger(__name__) class Limiter(object): """Basic rate-limiter class that makes use of built-in python Queue""" def __init__( self, bucket_class, bucket_kwargs = None, *rates): """Init a limiter with rates and specific bucket type - Bucket type can be any class that extends AbstractBucket - 3 kinds of Bucket are provided, being MemoryQueueBucket, MemoryListBucket and RedisBucket - Opts is extra keyword-arguements for Bucket class constructor """ self._validate_rate_list(rates) self._rates = rates self._bkclass = bucket_class self._bucket_args = bucket_kwargs or {} self.bucket_group = {} monotonic() def _validate_rate_list(self, rates): # pylint: disable=no-self-use if not rates: raise InvalidParams("Rate(s) must be provided") for idx, rate in enumerate(rates[1:]): prev_rate = rates[idx] invalid = rate.limit <= prev_rate.limit or rate.interval <= prev_rate.interval if invalid: msg = "{prev_rate} cannot come before {rate}".format(prev_rate = prev_rate, rate = rate) raise InvalidParams(msg) def _init_buckets(self, identities): """Setup Queue for each Identity if needed Queue's maxsize equals the max limit of request-rates """ if len(self.bucket_group.keys()) > 100000: self.bucket_group = {} for item_id in identities: if not self.bucket_group.get(item_id): maxsize = self._rates[-1].limit self.bucket_group[item_id] = self._bkclass( maxsize = maxsize, identity = item_id, **self._bucket_args ) def try_acquire(self, *identities): """Acquiring an item or reject it if rate-limit has been exceeded""" self._init_buckets(identities) now = monotonic() for idx, rate in enumerate(self._rates): for item_id in identities: bucket = self.bucket_group[item_id] volume = bucket.size() if volume < rate.limit: continue # Determine rate's time-window starting point start_time = now - rate.interval item_count, remaining_time = bucket.inspect_expired_items(start_time) if item_count >= rate.limit: raise BucketFullException(item_id, rate, remaining_time) if idx == len(self._rates) - 1: # We remove item based on the request-rate with the max-limit bucket.get(volume - item_count) for item_id in identities: self.bucket_group[item_id].put(now) logger.debug('pid = {}; now bucket size is: {}; bucket group len = {}'.format( os.getpid(), sys.getsizeof(self.bucket_group), len(self.bucket_group))) def ratelimit( self, delay = False, max_delay = None, *identities): """A decorator and contextmanager that applies rate-limiting, with async support. Depending on arguments, calls that exceed the rate limit will either raise an exception, or sleep until space is available in the bucket. Args: identities: Bucket identities delay: Delay until the next request instead of raising an exception max_delay: The maximum allowed delay time (in seconds); anything over this will raise an exception """ return LimitContextDecorator(self, *identities, delay = delay, max_delay = max_delay) def get_current_volume(self, identity): """Get current bucket volume for a specific identity""" bucket = self.bucket_group[identity] return bucket.size()