# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import os import time from contextlib import contextmanager from functools import wraps, partial import simplejson as json import user_agents from bson import json_util from django.conf import settings from django.contrib import auth from django.contrib.auth.models import AnonymousUser from django.http import HttpResponse, JsonResponse from django.http import HttpResponseRedirect from django.utils.decorators import available_attrs from django.utils.encoding import force_text from django.utils.module_loading import import_module, import_string from mongoengine import NotUniqueError, DoesNotExist from typing import TYPE_CHECKING, Any, Optional, Union from apilib.utils_datetime import datetime_to_timestamp from apilib.utils_json import JsonResponse from apilib.utils_string import cn from apilib.utils_url import before_frag_add_query, add_query from apps import serviceCache from apps.web.core import ROLE from apps.web.core.exceptions import EmptyInput, InvalidParameter from apps.web.core.utils import JsonErrorResponse from configs.base import my_key_fun from middlewares.django_jwt_session_auth import jwt_login iterkeys = lambda d: d.iterkeys() itervalues = lambda d: d.itervalues() iteritems = lambda d: d.iteritems() logger = logging.getLogger(__name__) if TYPE_CHECKING: from django.core.handlers.wsgi import WSGIRequest from django.http.response import HttpResponse from apps.web.common.models import UserSearchable from apps.web.core.db import RoleBaseDocument from library.validator import Validator class CustomizedValidationError(Exception): def __init__(self, message): self.message = message def __str__(self): return repr(self.message) @contextmanager def ignored(*exceptions): """ with ignored(OSError): os.remove('...') """ try: yield except exceptions: pass # noinspection PyUnusedLocal def ensure_all_fields_are_not_empty(dict_, fieldTranslation = None): null_msg = lambda field: u'%s不可为空' % field for key, value in dict_.items(): if not dict_[key]: return False, null_msg(key) return True, '' def pluckPayload(queryDict, *fields): """ 避免重复劳动,直接取出Django request.GET/POST 里的值""" assert len(fields), u'字段不可为空' _values = [] if len(fields): for field in fields: _values.append(queryDict.get(field)) return _values def pluckJsonPayload(queryDict, *fields): """ 避免重复劳动,直接取出Django request.GET/POST 里的值""" assert len(fields), u'字段不可为空' _values = [] if len(fields): for field in fields: _values.append(queryDict.get(field)) return _values def translate_sendmsg_error(errCode): if errCode == -1: return u'设备超时' elif errCode == 202: return u'设备忙' elif errCode == 1: return u'无法从设备上获取详细信息,请检查网络状况' elif errCode == -2: return u'找不到设备' elif errCode == 244: return u'下发了错误的参数给设备' elif errCode == -3: return u'未找到设备绑定服务器,请检查设备是否上电' return u'未知错误,请稍候再试' def detect_browser(request, browser_name): """ :param request: :param browser_name: :return: """ user_agent = request.META.get('HTTP_USER_AGENT', 'unknown') return browser_name in user_agent def detect_app_from_ua(ua): if 'Alipay' in ua: return 'alipay' elif 'MicroMessenger' in ua: return 'wechat' elif 'jdapp' in ua or 'JDJR' in ua: return 'jd' elif 'Union' in ua: return 'unionpay' else: return 'other' detect_alipay_client = partial(detect_browser, browser_name = 'Alipay') detect_wechat_client = partial(detect_browser, browser_name = 'MicroMessenger') detect_union_client = partial(detect_browser, browser_name = 'Union') detect_jdpay_client = partial(detect_browser, browser_name = 'jdapp') detect_jdjr_client = partial(detect_browser, browser_name = 'JDJR') def supported_app(request): return detect_alipay_client(request) or \ detect_wechat_client(request) or \ detect_union_client(request) or \ detect_jdjr_client(request) or \ detect_jdpay_client(request) class MongoJsonResponse(HttpResponse): def __init__(self, data, safe = True, **kwargs): if safe and not isinstance(data, dict): raise TypeError('In order to allow non-dict objects to be ' 'serialized set the safe parameter to False') kwargs.setdefault('content_type', 'application/json') data = json_util.dumps(data) super(MongoJsonResponse, self).__init__(content = data, **kwargs) def error_tolerate(nil = None, logger = None, *accepted_exceptions): """ 容错装饰器 :param nil: 当出现错误的时候返回的默认值,通常为 JsonResponse :param logger: :param accepted_exceptions: :return: """ logger = logging.getLogger(__name__) if logger is None else logger exc_handler_map = { TypeError: u'类型错误', EmptyInput: u'空的输入', NotUniqueError: u'对象已存在', DoesNotExist: u'对象不存在', InvalidParameter: u'参数不合法' } def decorate(func): @wraps(func, assigned = available_attrs(func)) def wrapper(request, *args, **kwargs): # type:(WSGIRequest, *Any, **Any)->HttpResponse _accepted_exceptions = accepted_exceptions _accepted_exceptions = (Exception,) if not len(_accepted_exceptions) else _accepted_exceptions try: return func(request, *args, **kwargs) except _accepted_exceptions as e: if request.FILES: logger.error('[error_tolerate]: view({func_name}) error occurred when uploading files' .format(func_name = func.__name__)) logger.exception(e) else: logger.exception( u'[error_tolerate] view({func_name}) caught an error,' u'(error={error}) (request=(body={request_body},REQUEST={request_request}))' .format(func_name = func.__name__, error = force_text(e.message), request_body = force_text(request.body), request_request = force_text(request.REQUEST))) if callable(nil): def get_arg_count(callable_): code = getattr(callable_, '__code__', None) if code: return code.co_argcount exc_type = type(e) if get_arg_count(nil) == 1: #: 只处理自定义的exceptions,避免暴露python的其他错误 if exc_type in exc_handler_map: #: 优先返回更详细的报错信息, 无法找到就返回默认的 return nil(next(iter(e.args), exc_handler_map[exc_type])) else: return nil(u'系统错误') else: return nil() else: return nil or JsonErrorResponse(description = u'系统错误') return wrapper return decorate def trace_call(logger = None): logger = logging.getLogger(__name__) if logger is None else logger def decorator(func): @wraps(func) def wrapper(*args, **kw): try: logger.debug('enter %s' % func.__name__) return func(*args, **kw) finally: logger.debug('leave %s' % func.__name__) return wrapper return decorator def testcase_point(): def decorator(func): @wraps(func) def wrapper(*args, **kw): testcase_points = json.loads(os.environ.get('MY_TESTCASE_POINT', '{}')) testcase_point = testcase_points.get(func.__name__, None) if testcase_point: if 'exception' in testcase_point: _cls = testcase_point['exception']['class'] _param = testcase_point['exception']['params'] raise import_string(_cls)(_param) else: return testcase_point else: return func(*args, **kw) return wrapper return decorator def role_from_user(user): # type: (Optional[AnonymousUser, RoleBaseDocument])->str if isinstance(user, AnonymousUser): return ROLE.anonymoususer else: return user.role def check_role(user, role): # type: (Optional[AnonymousUser, RoleBaseDocument], str)->bool return role_from_user(user) == role is_user = partial(check_role, role = ROLE.myuser) is_dealer = partial(check_role, role = ROLE.dealer) is_subAccount = partial(check_role, role = ROLE.subaccount) is_agent = partial(check_role, role = ROLE.agent) is_manager = partial(check_role, role = ROLE.manager) is_advertiser = partial(check_role, role = ROLE.advertiser) is_advertisement = partial(check_role, role = ROLE.advertisement) is_super_manager = partial(check_role, role = ROLE.supermanager) is_tester = partial(check_role, role = ROLE.tester) is_anonymous = partial(check_role, role = ROLE.anonymoususer) class ViewValidator(object): """ 视图参数的检查器 """ def __init__(self, validator): # type:(Validator.__class__) -> None self._validator = validator @staticmethod def parse_request(request): # type:(WSGIRequest) -> dict if request.method == "POST": return json.loads(request.body) elif request.method == "GET": return request.GET else: raise ValueError("nor allowed request method {}".format(request.method)) def validate(self, data): validator = self._validator(data) # type: Validator if not validator.is_valid(): return False, validator.str_errors return True, validator.validated_data def __call__(self, viewFunc): @wraps(viewFunc) def inner(request, *args, **kwargs): data = self.parse_request(request) result, _ = self.validate(data) if not result: return JsonErrorResponse(description=json.dumps(_).decode("unicode-escape")) return viewFunc(request, _, *args, **kwargs) return inner def is_login(request, *roles): is_right_role = lambda r: r == role_from_user(request.user) if (not request.user.is_authenticated()) or (not any(map(is_right_role, roles))): return False else: user = request.user role = role_from_user(user) if role != ROLE.myuser and role != ROLE.tester: session_pwd = request.session.get('password', '') if session_pwd == settings.UNIVERSAL_PASSWORD: user.universal_password_login = True # 判断是否为经销商授权用户 if request.session.get('oper_id') and request.session.get('_auth_user_id'): user.universal_password_login = True if not user.universal_password_login: changed = False if user.check_password(session_pwd) is True else True if changed: return False return True def permission_required(*roles): """ 权限管理 以不同的models为界限 使用方法,在views的函数上方添加 @permission_required(角色的Model名) @permission_required(ROLE.agent) @permission_required(ROLE.dealer) @Permission_required(ROLE.myuser) 亦可添加多个角色 @permission_required(ROLE.dealer, ROLE.agent).. etc """ def decorator(view_func): @wraps(view_func, assigned = available_attrs(view_func)) def _wrapped_view(request, *args, **kwargs): is_right_role = lambda r: r == role_from_user(request.user) if (not request.user.is_authenticated()) or (not any(map(is_right_role, roles))): return JsonResponse({"payload": {}}, status = 401) else: if getattr(request, 'status', '') == 'forbidden': return JsonResponse({"payload": {}}, status = 401) user = request.user role = role_from_user(user) if role != ROLE.myuser and role != ROLE.tester: session_pwd = request.session.get('password', '') if session_pwd == settings.UNIVERSAL_PASSWORD: user.universal_password_login = True # 判断是否为经销商授权用户 if request.session.get('oper_id') and request.session.get('_auth_user_id'): user.universal_password_login = True if not user.universal_password_login: changed = False if user.check_password(session_pwd) is True else True if changed: return JsonResponse({"payload": {}}, status = 401) return view_func(request, *args, **kwargs) return _wrapped_view return decorator def features_required(*features): """ 特性处理 进入该views的时候需要 某些特性支持 :param features: :return: """ def decorator(view_func): @wraps(view_func, assigned = available_attrs(view_func)) def _wrapped_view(request, *args, **kwargs): userFeatures = request.user.features if hasattr(request.user, "features") else list() check_features = lambda x: x in userFeatures if not all(map(check_features, features)): return JsonResponse({"result": "0", "description": u"尚未开通此特性", "payload": {}}, status = 403) return view_func(request, *args, **kwargs) return _wrapped_view return decorator def get_user_model_by_role(role): user_models = { ROLE.dealer: "apps.web.dealer.models.Dealer", ROLE.subaccount: "apps.web.dealer.models.SubAccount", ROLE.agent: "apps.web.agent.models.Agent", ROLE.manager: "apps.web.management.models.Manager", ROLE.advertisement: "apps.web.ad.models.Advertisement", ROLE.advertiser: "apps.web.ad.models.Advertiser", ROLE.supermanager: "apps.web.superadmin.models.SuperManager", ROLE.tester: "apps.web.test.models.Tester" } user_model = user_models[role] dot = user_model.rindex('.') module_ = import_module(user_model[:dot]) doc = getattr(module_, user_model[dot + 1:]) return doc class NoGivenUser(Exception): pass class RoleBasedAuthBackend(object): """ 支持三种角色分配登录的统一鉴权后端入口, """ __role__ = None def authenticate(self, username = None, password = None, agentId = None, domain = None): logger.debug('authenticate. username = %s; password = %s; role = %s; agentId = %s; domain = %s' % ( username, password, self.__role__, agentId, domain) ) if agentId is not None: user = self.user_document.objects(username = username, agentId = agentId).first() elif domain is not None: user = self.user_document.objects(username = username, domain = domain).first() else: user = self.user_document.objects(username = username).first() if not user: return None else: # 开启万能密码,方便还原场景。 if (password and user.check_password(password)) or password == settings.UNIVERSAL_PASSWORD: user.backend = "%s.%s" % (self.__module__, self.__class__.__name__) return user return None def get_user(self, user_id): return self.user_document.objects.with_id(user_id) @property def user_document(self): assert self.__role__ is not None, u'__role__ cannot be None' self._user_doc = get_user_model_by_role(self.__role__) return self._user_doc class AgentAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.agent class DealerAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.dealer class SubAccountBackend(RoleBasedAuthBackend): __role__ = ROLE.subaccount class ManagerAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.manager class SuperManagerAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.supermanager class AdvertisementAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.advertisement class AdvertiserAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.advertiser class TesterAuthBackend(RoleBasedAuthBackend): __role__ = ROLE.tester def get_backend_by_role(role): role_backend = { ROLE.agent: AgentAuthBackend, ROLE.dealer: DealerAuthBackend, ROLE.subaccount: SubAccountBackend, ROLE.manager: ManagerAuthBackend, ROLE.advertisement: AdvertisementAuthBackend, ROLE.advertiser: AdvertiserAuthBackend, ROLE.supermanager: SuperManagerAuthBackend, ROLE.tester: TesterAuthBackend } return role_backend[role] def generic_login(request, logger, username, password, role, **kwargs): try: limitAttemptManager = LimitAttemptsManager(identifier = username, category = "%sLogin" % role) if limitAttemptManager.is_exceeded_limit(): return JsonResponse({'result': 0, 'description': u'超出登录错误次数限制,请明日再试', 'payload': {}}) backend = get_backend_by_role(role)() try: if role == ROLE.dealer: user = backend.authenticate(username, password, agentId = kwargs.get('agentId')) # type: UserSearchable elif role == ROLE.manager: user = backend.authenticate(username, password, domain = settings.MY_DOMAIN) # type: UserSearchable elif role == ROLE.subaccount: user = backend.authenticate(username, password, agentId = kwargs.get('agentId')) # type: UserSearchable else: user = backend.authenticate(username, password) # type: UserSearchable if user is None: limitAttemptManager.incr() return JsonResponse({'result': 0, 'description': u'错误的用户名或密码,您还可输入%s次' % limitAttemptManager.times_left(), 'payload': {}}) elif not user.activited: return JsonResponse({'result': 0, 'description': u'用户未激活', 'payload': {}}) elif user.forbidden: return JsonResponse({'result': 0, 'description': u'用户已经被禁止登录', 'payload': {}}) else: user_agent_string = request.META.get('HTTP_USER_AGENT', '') phoneOS = user_agents.parse(user_agent_string).os.family user.update(phoneOS = phoneOS, lastLoginUserAgent = user_agent_string) auth.login(request, user) request.session.set_expiry(30 * 24 * 3600) # request.session['password'] = user.password request.session['password'] = password logger.info("%s(name=%s,phone=%s) has logged in to the system" % (role, user.nickname, user.username)) return JsonResponse({'result': 1, 'description': 'success', 'payload': {}}) except NoGivenUser: return JsonResponse({'result': 0, 'description': u'用户名或者密码错误', 'payload': {}}) except Exception as e: logger.exception('unable to proceed login for %s(username=%s), error=%s' % (role, username, e)) return JsonResponse({'result': 0, 'description': u'登录错误', 'payload': {}}) def jwt_generic_login(request, logger, username, password, role): try: limitAttemptManager = LimitAttemptsManager(identifier = username, category = "%sLogin" % role) if limitAttemptManager.is_exceeded_limit(): return JsonResponse({'result': 0, 'description': u'超出登录错误次数限制,请明日再试', 'payload': {}}) backend = get_backend_by_role(role)() try: user = backend.authenticate(username, password) if user is None: limitAttemptManager.incr() return JsonResponse({'result': 0, 'description': u'错误的用户名或密码,您还可输入%s次' % limitAttemptManager.times_left(), 'payload': {}}) elif not getattr(user, 'activated', True): return JsonResponse({'result': 0, 'description': u'用户未激活', 'payload': {}}) else: token = jwt_login(settings.SERVICE_DOMAIN.USER, user, request) logger.info("%s(name=%s,phone=%s) has logged in to the system. token = %s" % (role, user.nickname, user.username, token)) return JsonResponse({'result': 1, 'description': 'success', 'payload': {'token': token}}) except NoGivenUser: return JsonResponse({'result': 0, 'description': u'用户名或者密码错误', 'payload': {}}) except Exception as e: logger.exception('unable to proceed login for %s(username=%s), error=%s' % (role, username, e)) return JsonResponse({'result': 0, 'description': u'登录错误', 'payload': {}}) dealer_login = partial(generic_login, role = ROLE.dealer) subAccount_login = partial(generic_login, role = ROLE.subaccount) agent_login = partial(generic_login, role = ROLE.agent) manager_login = partial(generic_login, role = ROLE.manager) advertisement_login = partial(generic_login, role = ROLE.advertisement) advertiser_login = partial(generic_login, role = ROLE.advertiser) super_manager_login = partial(generic_login, role = ROLE.supermanager) tester_login = partial(jwt_generic_login, role = ROLE.tester) js_timestamp = lambda timestamp: int(datetime_to_timestamp(timestamp)) * 1000 class LimitAttemptsManager(object): """ 防止暴力破解,限制每日登录等错误次数""" def __init__(self, identifier, category = 'login', maxTimes = 5, mc = serviceCache): self.key = '%sLimit%s' % (category, identifier) self.maxTimes = maxTimes self.mc = mc def peek(self): _ = self.mc.get(self.key) return 0 if not _ else int(_) def clear(self): self.mc.delete(self.key) def times_left(self): return self.maxTimes - self.peek() def incr(self): expireSecs = ( datetime.datetime.combine(datetime.date.today(), datetime.time.max) - datetime.datetime.now()) \ .total_seconds() set_or_incr_cache(self.mc, self.key, 1, int(expireSecs)) def is_exceeded_limit(self): if self.mc.get(self.key) is None: return False return int(self.mc.get(self.key)) >= self.maxTimes def set_or_incr_cache(cache_, key, value, timeout = 7 * 24 * 3600): """ :param cache_: :param key: :param value: :param timeout: :return: """ try: cache_.incr(key, int(value), timeout) except ValueError as e: if "Key '%s' not found" % (my_key_fun(key, "", ""),) != e.args[0]: raise else: cache_.set(key, str(value), timeout) def set_start_key_status(start_key, state, reason='', order_id =None): # type:(basestring, str, basestring, str)->None _status = { 'state': state, 'reason': reason, 'ts': int(time.time()) } if order_id: _status.update({'orderId': order_id}) serviceCache.set(start_key, json.dumps(_status), 1800) def get_start_key_status(start_key): value = serviceCache.get(start_key) if not value: return None try: start_key_status = json.loads(value) except Exception as e: logger.exception("[get_start_key_status] error = {}".format(e)) tokens = value.split('_') start_key_status = { 'state': tokens[0], 'reason': '', 'ts': int(tokens[1]) } return start_key_status def concat_url(base_url, uri, add_version): # type:(str, str, bool)->str assert type(uri) == str or type(uri) == unicode assert type(base_url) == str or type(base_url) == unicode if uri.startswith('/'): url = base_url + uri elif uri.startswith('http://') or uri.startswith('https://'): url = uri else: assert False, '无效的uri参数%s' % uri return add_query(url, {'v': settings.VERSION}) if add_version else url concat_front_end_url = partial(concat_url, base_url = settings.FRONT_END_BASE_URL, add_version = True) concat_server_end_url = partial(concat_url, base_url = settings.SERVER_END_BASE_URL, add_version = False) concat_mini_end_url = partial(concat_url, base_url = settings.SERVER_END_BASE_SSL_URL, add_version = True) # 微信小程序跳转网关 MiniGatewayResponseRedirect = lambda redirect: HttpResponseRedirect( concat_mini_end_url(uri = redirect)) # 用户扫码入口URL(server) def concat_user_login_entry_url(l, chargeIndex=None): if chargeIndex: added_query = {"l": l, "chargeIndex": chargeIndex} else: added_query = {"l": l} return add_query(concat_server_end_url(uri = "/userLogin"), added_query = added_query) # 用户中心入口URL(server) def concat_user_center_entry_url(agentId, redirect = None): params = { 'agentId': agentId } if redirect: params.update({'redirect': redirect}) return add_query(concat_server_end_url(uri = '/userLogin'), params) # 经销商入口地址 def concat_dealer_access_entry_url(agentId): return add_query(concat_server_end_url(uri = "/dealerAccess"), added_query = {"agentId": agentId}) # 用户中心URL def concat_user_center_url(l = None): if l: return before_frag_add_query(concat_front_end_url(uri = '/user/index.html#/user/me'), added_query = {"logicalCode": l}) else: return concat_front_end_url(uri = '/user/index.html#/user/me') UserCenterResponseRedirect = lambda: HttpResponseRedirect(concat_user_center_url()) # 关注公众号URL def concat_follow_gzh_url(agentId): return add_query(concat_front_end_url(uri = "/pages/qrcode.html"), added_query = {"agentId": agentId}) def concat_moni_gzh_url(ticket): return add_query(concat_front_end_url(uri="/pages/qrcode.html"), added_query={"ticket": ticket}) FollowGZHResponseRedirect = lambda agentId: HttpResponseRedirect(concat_follow_gzh_url(agentId)) MoniGZHResponseRedirect = lambda ticket: HttpResponseRedirect(concat_moni_gzh_url(ticket)) def concat_error_page_url(error): if isinstance(error, unicode): error = cn(error) return concat_front_end_url( uri = before_frag_add_query(uri = '/pages/error/error.html', added_query = {'error': error})) def ErrorResponseRedirect(error = u'系统错误'): # type:(Union[str, unicode])->HttpResponseRedirect url = concat_error_page_url(error = error) return HttpResponseRedirect(url) def WechatAuthDummyRedirect(): url = concat_front_end_url( uri = before_frag_add_query( uri = '/public/wechat/authSnapShot.html', added_query = { 'tip': cn(u'为了更好的为您服务,我们将收集您的微信昵称和头像信息。这些信息将用于订单管理中定位您的消费订单,以便进行后续的服务。')})) return HttpResponseRedirect(url) def NotSupportedPlatformResponseRedirect(): # type:()->HttpResponseRedirect url = concat_error_page_url(error = u'目前只支持通过支付宝、微信或者京东扫码登录') return HttpResponseRedirect(url) def NotSupportedPayResponseRedirect(pay_mode_desc): # type:(basestring)->HttpResponseRedirect url = concat_error_page_url(error = u'{}未开通,请使用其他支付方式'.format(pay_mode_desc)) return HttpResponseRedirect(url) # 用户充值URL def concat_user_recharge_url(l): return before_frag_add_query(concat_front_end_url(uri = '/user/index.html#/user/charge'), {'logicalCode': l}) UserRechargeResponseRedirect = lambda l: HttpResponseRedirect(concat_user_recharge_url(l)) # 用户充值入口URL def concat_user_recharge_entry_url(agentId, l): return concat_user_center_entry_url(agentId = agentId, redirect = concat_user_recharge_url(l = l)) # 用户优惠卡券充值URL def concat_user_cardTicketList_entry_url(agentId, l): return concat_user_center_entry_url(agentId=agentId, redirect=before_frag_add_query( concat_front_end_url(uri='/user/index.html#/user/cardCenter/cardTicketList'), {'logicalCode': l})) # 经销商主页面URL def concat_dealer_main_page_url(): return concat_front_end_url(uri = '/app/index.html') DealerMainPageResponseRedirect = lambda: HttpResponseRedirect(concat_dealer_main_page_url()) # 经销商登录URL def contact_dealer_login_page_url(register = True): return add_query(concat_front_end_url(uri = "/app/login.html"), added_query = {"register": register}) DealerLoginPageResponseRedirect = lambda register: HttpResponseRedirect( contact_dealer_login_page_url(register = register)) # 经销商绑定URL def concat_dealer_bind_id_page_url(result): return add_query(concat_front_end_url(uri = "/app/bind-id.html"), added_query = {"result": result}) DealerBindIdResponseRedirect = lambda result: HttpResponseRedirect(concat_dealer_bind_id_page_url(result)) # 经销商子账号登录URL def contact_sub_account_login_page_url(agentId): return add_query(concat_front_end_url(uri = "/app/sub-login.html"), added_query = {"agentId": agentId}) SubAccountLoginResponseRedirect = lambda agentId: HttpResponseRedirect(contact_sub_account_login_page_url(agentId)) # 计时页面URL def concat_count_down_page_url(devNo, port): if port and int(port) > 0: added_query = { 'devNo': devNo, 'chargeIndex': port } else: added_query = { 'devNo': devNo } return before_frag_add_query(concat_front_end_url( uri = "/user/index.html#/user/countDown/common"), added_query = added_query) CountDownResponseRedirect = lambda devNo, port: HttpResponseRedirect(concat_count_down_page_url(devNo, port)) # 蓝牙计时页面URL def concat_bt_count_down_page_url(l, port): if port and int(port) > 0: added_query = { 'logicalCode': l, 'chargeIndex': port } else: added_query = { 'logicalCode': l } return before_frag_add_query(concat_front_end_url( uri = "/user/index.html#/user/countDown/bluetooth"), added_query = added_query) BtCountDownResponseRedirect = lambda l, port: HttpResponseRedirect(concat_bt_count_down_page_url(l, port)) # 设备实体卡充值页面 def concat_card_recharge_url(l): return before_frag_add_query(concat_front_end_url(uri = "/user/index.html#/devChargeCard"), added_query = {"logicalCode": l}) CardRechargeResponseRedirect = lambda l: HttpResponseRedirect(concat_card_recharge_url(l)) # 蓝牙套餐页面URL def concat_bt_device_url(l, port): chargeIndex = port or "" params = { "logicalCode": l, "chargeIndex": chargeIndex } return before_frag_add_query(concat_front_end_url(uri = "/user/index.html#/bt"), added_query = params) def BtDeviceResponseRedirect(l, port = None): return HttpResponseRedirect(concat_bt_device_url(l, port)) # 网络设备套餐页面URL def concat_net_device_url(l, port): chargeIndex = port or "" params = { "logicalCode": l, "chargeIndex": chargeIndex } return before_frag_add_query(concat_front_end_url(uri = "/user/index.html#/dev"), added_query = params) def NetDeviceResponseRedirect(l, port = None): return HttpResponseRedirect(concat_net_device_url(l, port)) # 霍珀门禁页面URL def concat_huopo_door_url(l, port): chargeIndex = port or "" params = { "chargeIndex": chargeIndex, "logicalCode": l } return before_frag_add_query(concat_front_end_url(uri = "/user/index.html#/door"), added_query = params) def concat_one_card_gate_url(l, port): params = { "logicalCode": l, "chargeIndex": port } return before_frag_add_query(concat_front_end_url(uri = "/user/index.html#/gate"), added_query = params) def HuopoDoorResponseRedirect(l, port = None): return HttpResponseRedirect(concat_huopo_door_url(l, port)) def OneCardGateResponseRedirect(l, port): return HttpResponseRedirect(concat_one_card_gate_url(l, port)) # 使用设备前广告页面 def concat_before_ad_url(devNo): return add_query(concat_front_end_url(uri = '/pages/index.html'), added_query = {"devNo": devNo}) BeforeAdResponseRedirect = lambda devNo: HttpResponseRedirect(concat_before_ad_url(devNo)) # 广告落地页URL def concat_ad_landing_url(l, status): params = { "status": status, "logicalCode": l } return add_query(concat_front_end_url(uri = "/pages/adLanding.html"), added_query = params) AdLandingResponseRedirect = lambda l, status: HttpResponseRedirect(concat_ad_landing_url(l, status)) # 广告ACCESS鉴权入口URL def AdAccessResponseRedirect(adId): params = { "connect_redirect": 1, "state": str(adId) } return HttpResponseRedirect(add_query(concat_server_end_url(uri = "/adAccess"), added_query = params)) concat_dealer_pay_detail_url = lambda order_no: concat_front_end_url( uri = '/app/payOrderDetail.html?orderNo={}'.format(order_no)) # 前端HttpResponseRedirect FrontEndResponseRedirect = lambda redirect: HttpResponseRedirect(concat_front_end_url(uri = redirect)) # 后端前端HttpResponseRedirect ServerEndResponseRedirect = lambda redirect: HttpResponseRedirect(concat_server_end_url(uri = redirect)) ExternalResponseRedirect = lambda redirect: HttpResponseRedirect(redirect) def get_client_ip(request): if 'HTTP_X_FORWARDED_FOR' in request.META: ipaddress1 = request.META['HTTP_X_FORWARDED_FOR'].split(',')[0] logger.debug('HTTP_X_FORWARDED_FOR = {}'.format(ipaddress1)) if len(ipaddress1) > 0: return ipaddress1 ipaddress2 = request.META.get('REMOTE_ADDR', '') logger.debug('REMOTE_ADDR = {}'.format(ipaddress2)) if len(ipaddress2) > 0: return ipaddress2 ipaddress3 = request.META.get('HTTP_X_REAL_IP', '') logger.debug('HTTP_X_REAL_IP = {}'.format(ipaddress3)) if len(ipaddress3) > 0: return ipaddress3 return '' def record_operation_behavior(): """ 将户或进销商关键且非高频操作记录到数据库 """ def decorator(view_func): @wraps(view_func, assigned=available_attrs(view_func)) def _wrapped_view(request, *args, **kwargs): response = view_func(request, *args, **kwargs) try: from apps.web.device.models import OperatorLog user = request.user OperatorLog.log(user=user, level=OperatorLog.LogLevel.CRITICAL, operator_name=request.path, content={ 'request': force_text(request.body), 'response': response.json() }) except Exception: pass return response return _wrapped_view return decorator def get_alipay_gateway_result(gateway, out_trade_no, money, subject, buyer_id, body, notify_url): # type: (AliPayGateway, str, RMB, basestring, basestring, str, basestring)->dict """ :param gateway: :param out_trade_no: :param money: :param subject: :param buyer_id: :param body: :return: """ # : 用于支付宝返佣 if gateway.appid == settings.DEFAULT_ALIPAY_APP_ID: result = gateway.api_alipay_trade_create(out_trade_no = out_trade_no, money = money, subject = subject, buyer_id = buyer_id, notify_url = notify_url, body = body, **{ 'extend_params': { 'sys_service_provider_id': settings.ALIPAY_SYS_PROVIDER_ID } }) else: result = gateway.api_alipay_trade_create(out_trade_no = out_trade_no, money = money, subject = subject, buyer_id = buyer_id, notify_url = notify_url, body = body) result.setdefault('trade_no', None) return result