# -*- coding: utf-8 -*- # !/usr/bin/env python import base64 import logging import simplejson as json from typing import TYPE_CHECKING from .errors import CannotParseUserAuthState logger = logging.getLogger(__name__) if TYPE_CHECKING: pass class Cookies(object): Recent_GroupId = 'user_group_id' Recent_DevNo = 'user_dev_no' class UserAuthState(object): """ b <==> by, 取值a(agentId), d(device) a <==> agentId p <=> productAgentId h <==> href d <==> devNo i <==> chargeIndex u <==> uid c <==> application id """ class BY(object): AGENT = 'agent' DEVICE = 'device' def __init__(self, by, **kwargs): self.by = by self.agentId = kwargs.get('agentId', '') self.productAgentId = kwargs.get('productAgentId', '') self.devNo = kwargs.get('devNo', '') self.chargeIndex = kwargs.get('chargeIndex', '') self.href = kwargs.get('href', '') self.uid = kwargs.get('uid', '') self.appid = kwargs.get("appid", "") def __repr__(self): return '{}'.format(self.__class__.__name__, self.by, self.agentId, self.productAgentId) def is_valid(self): if self.by in [UserAuthState.BY.AGENT, UserAuthState.BY.DEVICE]: return True else: return False def to_json(self): result = { 'a': self.agentId, 'p': self.productAgentId } if self.by == UserAuthState.BY.AGENT: result.update({ 'b': 'a' }) else: result.update({ 'b': 'd', 'd': self.devNo }) if self.appid: result["c"] = self.appid result.update({'h': self.href}) result.update({'i': self.chargeIndex}) result.update({'u': self.uid}) return result @staticmethod def from_json(state_json): result = {} try: result.update({'agentId': state_json['a']}) result.update({'productAgentId': state_json['p']}) if state_json['b'] == 'a': by = UserAuthState.BY.AGENT else: by = UserAuthState.BY.DEVICE result.update({'devNo': state_json['d']}) result.update({"appid": state_json.get("c", "")}) result.update({'href': state_json['h']}) result.update({'chargeIndex': state_json['i']}) result.update({'uid': state_json['u']}) return UserAuthState(by = by, **result) except KeyError: raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,)) def encode(self): # type:(UserAuthState)->str return base64.b64encode(json.dumps(self.to_json())) @staticmethod def decode(state_str): # type:(str)->UserAuthState return UserAuthState.from_json(json.loads(base64.b64decode(state_str))) @classmethod def by_dev(cls, **kwargs): return cls(by = cls.BY.DEVICE, **kwargs) @classmethod def by_agent(cls, **kwargs): return cls(by = cls.BY.AGENT, **kwargs) class MoniUserAuthState(object): """ 解决STATE参数过长问题,暂时没有其他办法,尽量较少字符串长度 b <==> by, 取值a(agentId), d(device) a <==> agentId h <==> href d <==> devNo i <==> chargeIndex u <==> uid """ def __init__(self, **kwargs): self.openId = kwargs.get('openId', '') self.href = kwargs.get('href', '') self.appId = kwargs.get('appId','') def is_valid(self): return True def to_json(self): result = {} result.update({'h': self.href}) result.update({'o': self.openId}) result.update({'i':self.appId}) return result @staticmethod def from_json(state_json): result = {} try: result.update({'href': state_json['h']}) result.update({'openId': state_json['o']}) result.update({'appId': state_json['i']}) return MoniUserAuthState(**result) except KeyError: raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,)) def encode(self): # type:(UserAuthState)->str return base64.b64encode(json.dumps(self.to_json())) @staticmethod def decode(state_str): # type:(str)->UserAuthState return MoniUserAuthState.from_json(json.loads(base64.b64decode(state_str)))