123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import base64
- import logging
- import math
- import itsdangerous
- from functools import wraps
- import simplejson as json
- from Crypto.Cipher import AES
- from django.conf import settings
- from django.contrib.auth.hashers import check_password
- from django.core.handlers.wsgi import WSGIRequest
- from django.http import JsonResponse
- from django.utils.encoding import force_text
- from mongoengine import NotUniqueError, DoesNotExist
- from typing import TYPE_CHECKING
- from apilib.utils_string import md5
- from apps.web.agent.models import Agent
- from apps.web.api.exceptions import ApiException, ApiAuthException
- from apps.web.core.exceptions import EmptyInput, InvalidParameter
- from apps.web.dealer.models import Dealer
- from apps.web.constant import ErrorCode
- from apps.web.core.utils import JsonErrorResponse
- from apps.web.device.models import Device
- if TYPE_CHECKING:
- pass
- logger = logging.getLogger(__name__)
- def api_dealer_auth(request):
- # type: (WSGIRequest)->Dealer
- """
- 经销商授权
- :param request:
- :return:
- """
- if request is None:
- raise ApiAuthException(errmsg = u'请确认传入授权头')
- if 'HTTP_AUTHORIZATION' not in request.META:
- logger.debug('http header not found')
- raise ApiAuthException(errmsg = u'请确认传入授权头')
- try:
- credentials = request.META['HTTP_AUTHORIZATION'].split()[1]
- except IndexError:
- raise ApiAuthException(errmsg = u'请确认授权头的格式符合规范')
- auth = base64.b64decode(credentials)
- username, password = auth.split(":")[0], auth.split(":")[1]
- sign = request.META.get('HTTP_SIGN')
- if sign is None:
- payload = json.loads(request.body) if request.body else {}
- if not payload or 'sign' not in payload:
- raise ApiAuthException(errmsg = u'找不到签名')
- sign = payload['sign']
- logger.debug('api = {}; username = {}; sign = {}'.format(str(request.path), username, sign))
- agent = Agent.objects(agentSign = sign).first()
- if not agent:
- raise ApiAuthException(errmsg = u'找不到代理商')
- dealer = Dealer.objects(username = username, agentId = str(agent.id)).first()
- if dealer is None:
- logger.debug('dealer not found')
- raise ApiAuthException(errmsg = u'找不到经销商')
- if not (check_password(md5(password), dealer.password) or check_password(password, dealer.password)):
- logger.debug('dealer password does not match')
- raise ApiAuthException(errmsg = u'经销商账号名或者密码错误')
- setattr(dealer, 'api_conf', {
- 'agentId': str(agent.id),
- 'agentSign': sign,
- 'mySign': agent.mySign,
- 'domain': agent.domain
- })
- return dealer
- def api_call(nil = None, logger = None, *accepted_exceptions):
- logger = logging.getLogger(__name__) if logger is None else logger
- exc_handler_map = {
- TypeError: u'类型错误',
- EmptyInput: u'空的输入',
- NotUniqueError: u'对象已存在',
- DoesNotExist: u'对象不存在',
- InvalidParameter: u'参数不合法'
- }
- def decorator(func):
- @wraps(func)
- def wrapper(request, *args, **kwargs):
- _accepted_exceptions = accepted_exceptions
- _accepted_exceptions = (Exception,) if not len(_accepted_exceptions) else _accepted_exceptions
- try:
- dealer = api_dealer_auth(request)
- try:
- logger.debug('enter {}'.format(str(request.path)))
- return func(request, dealer, *args, **kwargs)
- finally:
- logger.debug('left {}'.format(str(request.path)))
- except ApiException as e:
- logger.error(str(e))
- return e.to_response()
- except _accepted_exceptions as e:
- if request.FILES:
- logger.error('[error_tolerate]: view({func_name}) error occurred when uploading files'
- .format(func_name = func.__name__))
- logger.exception(e)
- else:
- logger.exception(
- u'[error_tolerate] view({func_name}) caught an error,'
- u'(error={error}) (request=(body={request_body},REQUEST={request_request}))'
- .format(func_name = func.__name__, error = force_text(e.message),
- request_body = force_text(request.body),
- request_request = force_text(request.REQUEST)))
- if callable(nil):
- def get_arg_count(callable_):
- code = getattr(callable_, '__code__', None)
- if code:
- return code.co_argcount
- exc_type = type(e)
- if get_arg_count(nil) == 1:
- #: 只处理自定义的exceptions,避免暴露python的其他错误
- if exc_type in exc_handler_map:
- #: 优先返回更详细的报错信息, 无法找到就返回默认的
- return nil(next(iter(e.args), exc_handler_map[exc_type]))
- else:
- return nil(u'系统错误')
- else:
- return nil()
- else:
- return nil or api_exception_response()
- return wrapper
- return decorator
- deprecated_api_error_response = JsonErrorResponse
- def api_ok_response(payload = {}):
- response_dict = {
- 'errcode': ErrorCode.SUCCESS,
- 'errmsg': 'SUCCESS',
- 'result': 1,
- 'payload': payload,
- 'description': ''
- }
- response_dict.update(payload)
- return JsonResponse(response_dict)
- def api_ok_response_v2(**data):
- response_dict = {
- 'errcode': ErrorCode.SUCCESS,
- 'errmsg': 'SUCCESS'
- }
- response_dict.update(**data)
- return JsonResponse(response_dict)
- def api_error_response(errcode, errmsg, payload = {}):
- response_dict = {
- 'errcode': errcode,
- 'errmsg': errmsg,
- 'result': 0,
- 'description': errmsg,
- 'payload': payload
- }
- return JsonResponse(response_dict)
- def api_exception_response(payload = {}):
- response_dict = {
- 'errcode': ErrorCode.API_EXCEPTION,
- 'errmsg': u'系统错误',
- 'result': 0,
- 'description': u'系统错误',
- 'payload': payload
- }
- return JsonResponse(response_dict)
- def AES_CBC_PKCS5padding_encrypt(s, dataSecret=None, dataSecretIV=None): # type:(str, str, str) -> str
- """
- AES CBC PKCS5padding 加密
- :param s: 待加密字符串
- :param dataSecret: 加密秘钥
- :param dataSecretIV: 加密向量
- """
- _mode = AES.MODE_CBC
- _size = AES.block_size
- _secret = str(dataSecret) if dataSecret else str(settings.AES_CBC_DATA_SECRET)
- _secretIV = str(dataSecretIV) if dataSecretIV else str(settings.AES_CBC_DATA_SECRET_IV)
- pad = lambda x: x + (_size - len(s) % _size) * chr(_size - len(s) % _size)
- cipher = AES.new(_secret, _mode, _secretIV)
- crypt = cipher.encrypt(pad(s))
- return base64.b64encode(crypt)
- def AES_CBC_PKCS5padding_decrypt(s, dataSecret=None, dataSecretIV=None):
- """
- AES CBC PKCS5padding 解密
- :param s: 待加密字符串
- :param dataSecret: 加密秘钥
- :param dataSecretIV: 加密向量
- """
- _mode = AES.MODE_CBC
- _size = AES.block_size
- _secret = str(dataSecret) if dataSecret else str(settings.AES_CBC_DATA_SECRET)
- _secretIV = str(dataSecretIV) if dataSecretIV else str(settings.AES_CBC_DATA_SECRET_IV)
- enc = base64.b64decode(s)
- unpad = lambda x: x[0:-ord(x[-1])]
- cipher = AES.new(_secret, _mode, _secretIV)
- return unpad(cipher.decrypt(enc))
- def generate_json_token(data, expire=None):
- salt = settings.SHAN_DONG_TOKEN_SECRET
- its = itsdangerous.TimedJSONWebSignatureSerializer(salt, expire)
- return its.dumps(data)
- def parse_json_token(s, expire=None):
- salt = settings.SHAN_DONG_TOKEN_SECRET
- its = itsdangerous.TimedJSONWebSignatureSerializer(salt, expire)
- try:
- result = its.loads(s)
- except itsdangerous.BadData:
- return dict()
- return result
- def bd09_to_gcj02(lng, lat):
- """
- 百度坐标系到google坐标系的经纬度转换
- :param lng: 百度经度
- :param lat: 百度维度
- :return:
- """
- if lng == 0.0 or lat == 0.0:
- return lng, lat
- x_pi = 3.14159265358979324 * 3000.0 / 180.0
- x = lng - 0.0065
- y = lat - 0.006
- z = math.sqrt(x * x + y * y) - 0.00002 * math.sin(y * x_pi)
- theta = math.atan2(y, x) - 0.000003 * math.cos(x * x_pi)
- gg_lng = z * math.cos(theta)
- gg_lat = z * math.sin(theta)
- return [gg_lng, gg_lat]
- def get_coordinates_and_nums(groupId):
- devices = Device.objects.filter(groupId=groupId)
- _count = devices.count()
- if _count == 0:
- return 0.0, 0.0, _count
- for dev in devices:
- if dev.location is not None and dev.location.has_key('coordinates'):
- coord = dev.location['coordinates']
- return coord[0], coord[1], _count
- return 0.0, 0.0, _count
|