# -*- coding: utf-8 -*- # !/usr/bin/env python import base64 import logging import math import itsdangerous from functools import wraps import simplejson as json from Crypto.Cipher import AES from django.conf import settings from django.contrib.auth.hashers import check_password from django.core.handlers.wsgi import WSGIRequest from django.http import JsonResponse from django.utils.encoding import force_text from mongoengine import NotUniqueError, DoesNotExist from typing import TYPE_CHECKING from apilib.utils_string import md5 from apps.web.agent.models import Agent from apps.web.api.exceptions import ApiException, ApiAuthException from apps.web.core.exceptions import EmptyInput, InvalidParameter from apps.web.dealer.models import Dealer from apps.web.constant import ErrorCode from apps.web.core.utils import JsonErrorResponse from apps.web.device.models import Device if TYPE_CHECKING: pass logger = logging.getLogger(__name__) def api_dealer_auth(request): # type: (WSGIRequest)->Dealer """ 经销商授权 :param request: :return: """ if request is None: raise ApiAuthException(errmsg = u'请确认传入授权头') if 'HTTP_AUTHORIZATION' not in request.META: logger.debug('http header not found') raise ApiAuthException(errmsg = u'请确认传入授权头') try: credentials = request.META['HTTP_AUTHORIZATION'].split()[1] except IndexError: raise ApiAuthException(errmsg = u'请确认授权头的格式符合规范') auth = base64.b64decode(credentials) username, password = auth.split(":")[0], auth.split(":")[1] sign = request.META.get('HTTP_SIGN') if sign is None: payload = json.loads(request.body) if request.body else {} if not payload or 'sign' not in payload: raise ApiAuthException(errmsg = u'找不到签名') sign = payload['sign'] logger.debug('api = {}; username = {}; sign = {}'.format(str(request.path), username, sign)) agent = Agent.objects(agentSign = sign).first() if not agent: raise ApiAuthException(errmsg = u'找不到代理商') dealer = Dealer.objects(username = username, agentId = str(agent.id)).first() if dealer is None: logger.debug('dealer not found') raise ApiAuthException(errmsg = u'找不到经销商') if not (check_password(md5(password), dealer.password) or check_password(password, dealer.password)): logger.debug('dealer password does not match') raise ApiAuthException(errmsg = u'经销商账号名或者密码错误') setattr(dealer, 'api_conf', { 'agentId': str(agent.id), 'agentSign': sign, 'mySign': agent.mySign, 'domain': agent.domain }) return dealer def api_call(nil = None, logger = None, *accepted_exceptions): logger = logging.getLogger(__name__) if logger is None else logger exc_handler_map = { TypeError: u'类型错误', EmptyInput: u'空的输入', NotUniqueError: u'对象已存在', DoesNotExist: u'对象不存在', InvalidParameter: u'参数不合法' } def decorator(func): @wraps(func) def wrapper(request, *args, **kwargs): _accepted_exceptions = accepted_exceptions _accepted_exceptions = (Exception,) if not len(_accepted_exceptions) else _accepted_exceptions try: dealer = api_dealer_auth(request) try: logger.debug('enter {}'.format(str(request.path))) return func(request, dealer, *args, **kwargs) finally: logger.debug('left {}'.format(str(request.path))) except ApiException as e: logger.error(str(e)) return e.to_response() except _accepted_exceptions as e: if request.FILES: logger.error('[error_tolerate]: view({func_name}) error occurred when uploading files' .format(func_name = func.__name__)) logger.exception(e) else: logger.exception( u'[error_tolerate] view({func_name}) caught an error,' u'(error={error}) (request=(body={request_body},REQUEST={request_request}))' .format(func_name = func.__name__, error = force_text(e.message), request_body = force_text(request.body), request_request = force_text(request.REQUEST))) if callable(nil): def get_arg_count(callable_): code = getattr(callable_, '__code__', None) if code: return code.co_argcount exc_type = type(e) if get_arg_count(nil) == 1: #: 只处理自定义的exceptions,避免暴露python的其他错误 if exc_type in exc_handler_map: #: 优先返回更详细的报错信息, 无法找到就返回默认的 return nil(next(iter(e.args), exc_handler_map[exc_type])) else: return nil(u'系统错误') else: return nil() else: return nil or api_exception_response() return wrapper return decorator deprecated_api_error_response = JsonErrorResponse def api_ok_response(payload = {}): response_dict = { 'errcode': ErrorCode.SUCCESS, 'errmsg': 'SUCCESS', 'result': 1, 'payload': payload, 'description': '' } response_dict.update(payload) return JsonResponse(response_dict) def api_ok_response_v2(**data): response_dict = { 'errcode': ErrorCode.SUCCESS, 'errmsg': 'SUCCESS' } response_dict.update(**data) return JsonResponse(response_dict) def api_error_response(errcode, errmsg, payload = {}): response_dict = { 'errcode': errcode, 'errmsg': errmsg, 'result': 0, 'description': errmsg, 'payload': payload } return JsonResponse(response_dict) def api_exception_response(payload = {}): response_dict = { 'errcode': ErrorCode.API_EXCEPTION, 'errmsg': u'系统错误', 'result': 0, 'description': u'系统错误', 'payload': payload } return JsonResponse(response_dict) def AES_CBC_PKCS5padding_encrypt(s, dataSecret=None, dataSecretIV=None): # type:(str, str, str) -> str """ AES CBC PKCS5padding 加密 :param s: 待加密字符串 :param dataSecret: 加密秘钥 :param dataSecretIV: 加密向量 """ _mode = AES.MODE_CBC _size = AES.block_size _secret = str(dataSecret) if dataSecret else str(settings.AES_CBC_DATA_SECRET) _secretIV = str(dataSecretIV) if dataSecretIV else str(settings.AES_CBC_DATA_SECRET_IV) pad = lambda x: x + (_size - len(s) % _size) * chr(_size - len(s) % _size) cipher = AES.new(_secret, _mode, _secretIV) crypt = cipher.encrypt(pad(s)) return base64.b64encode(crypt) def AES_CBC_PKCS5padding_decrypt(s, dataSecret=None, dataSecretIV=None): """ AES CBC PKCS5padding 解密 :param s: 待加密字符串 :param dataSecret: 加密秘钥 :param dataSecretIV: 加密向量 """ _mode = AES.MODE_CBC _size = AES.block_size _secret = str(dataSecret) if dataSecret else str(settings.AES_CBC_DATA_SECRET) _secretIV = str(dataSecretIV) if dataSecretIV else str(settings.AES_CBC_DATA_SECRET_IV) enc = base64.b64decode(s) unpad = lambda x: x[0:-ord(x[-1])] cipher = AES.new(_secret, _mode, _secretIV) return unpad(cipher.decrypt(enc)) def generate_json_token(data, expire=None): salt = settings.SHAN_DONG_TOKEN_SECRET its = itsdangerous.TimedJSONWebSignatureSerializer(salt, expire) return its.dumps(data) def parse_json_token(s, expire=None): salt = settings.SHAN_DONG_TOKEN_SECRET its = itsdangerous.TimedJSONWebSignatureSerializer(salt, expire) try: result = its.loads(s) except itsdangerous.BadData: return dict() return result def bd09_to_gcj02(lng, lat): """ 百度坐标系到google坐标系的经纬度转换 :param lng: 百度经度 :param lat: 百度维度 :return: """ if lng == 0.0 or lat == 0.0: return lng, lat x_pi = 3.14159265358979324 * 3000.0 / 180.0 x = lng - 0.0065 y = lat - 0.006 z = math.sqrt(x * x + y * y) - 0.00002 * math.sin(y * x_pi) theta = math.atan2(y, x) - 0.000003 * math.cos(x * x_pi) gg_lng = z * math.cos(theta) gg_lat = z * math.sin(theta) return [gg_lng, gg_lat] def get_coordinates_and_nums(groupId): devices = Device.objects.filter(groupId=groupId) _count = devices.count() if _count == 0: return 0.0, 0.0, _count for dev in devices: if dev.location is not None and dev.location.has_key('coordinates'): coord = dev.location['coordinates'] return coord[0], coord[1], _count return 0.0, 0.0, _count