123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import logging
- from functools import wraps
- from django.conf import settings
- from typing import Any, TYPE_CHECKING
- from apps import serviceCache
- from apps.web.core.utils import JsonErrorResponse
- from apps.web.utils import get_client_ip
- if TYPE_CHECKING:
- from apps.web.core.db import RoleBaseDocument
- from django.core.handlers.wsgi import WSGIRequest
- from django.http import HttpResponse
- def request_limit_by_user(operation='common', limit=50, period=24 * 60 * 60, logger=None):
- """
- http请求速率限制(按照用户ID)
- :param logger:
- :param operation: 操作
- :param limit: 限制次数
- :param period: 时长
- :return:
- """
- logger = logging.getLogger(__name__) if logger is None else logger
- def decorate(func):
- @wraps(func)
- def wrapper(request, *args, **kwargs):
- # type:(WSGIRequest, *Any, **Any)->HttpResponse
- if not settings.ENABLE_REQUEST_LIMIT:
- logger.debug('request limit is disable.')
- return func(request, *args, **kwargs)
- user = request.user # type: RoleBaseDocument
- dayTime = datetime.datetime.now().strftime('%Y-%m-%d')
- key = '{}_{}_{}'.format(user.request_limit_key, operation, dayTime)
- value = serviceCache.get(key, 0)
- if value >= limit:
- logger.debug("{} in {} exceed max limit.".format(key, get_client_ip(request)))
- return JsonErrorResponse(description=u'操作过于频繁,休息一下喔。')
- else:
- value += 1
- serviceCache.set(key, value, period)
- return func(request, *args, **kwargs)
- return wrapper
- return decorate
- def request_limit_by_ip(operation='common', limit=50, period=24 * 60 * 60, logger=None):
- """
- http请求速率限制(按照IP)
- :param logger:
- :param operation: 操作
- :param limit: 限制次数
- :param period: 时长
- :return:
- """
- logger = logging.getLogger(__name__) if logger is None else logger
- def decorate(func):
- @wraps(func)
- def wrapper(request, *args, **kwargs):
- # type:(WSGIRequest, *Any, **Any)->HttpResponse
- if not settings.ENABLE_REQUEST_LIMIT:
- logger.debug('request limit is disable.')
- return func(request, *args, **kwargs)
- ip = get_client_ip(request)
- dayTime = datetime.datetime.now().strftime('%Y-%m-%d')
- key = '{}_{}_{}'.format(ip, operation, dayTime)
- value = serviceCache.get(key, 0)
- if value >= limit:
- logger.debug("{} in {} exceed max limit.".format(operation, ip))
- return JsonErrorResponse(description=u'操作过于频繁,休息一下喔。')
- else:
- value += 1
- serviceCache.set(key, value, period)
- return func(request, *args, **kwargs)
- return wrapper
- return decorate
|