123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import base64
- import logging
- import simplejson as json
- from typing import TYPE_CHECKING
- from .errors import CannotParseUserAuthState
- logger = logging.getLogger(__name__)
- if TYPE_CHECKING:
- pass
- class Cookies(object):
- Recent_GroupId = 'user_group_id'
- Recent_DevNo = 'user_dev_no'
- class UserAuthState(object):
- """
- b <==> by, 取值a(agentId), d(device)
- a <==> agentId
- p <=> productAgentId
- h <==> href
- d <==> devNo
- i <==> chargeIndex
- u <==> uid
- c <==> application id
- """
- class BY(object):
- AGENT = 'agent'
- DEVICE = 'device'
- def __init__(self, by, **kwargs):
- self.by = by
- self.agentId = kwargs.get('agentId', '')
- self.productAgentId = kwargs.get('productAgentId', '')
- self.devNo = kwargs.get('devNo', '')
- self.chargeIndex = kwargs.get('chargeIndex', '')
- self.href = kwargs.get('href', '')
- self.uid = kwargs.get('uid', '')
- self.appid = kwargs.get("appid", "")
- def __repr__(self):
- return '{}<by = {}, agentId = {}, productAgentId = {}>'.format(self.__class__.__name__, self.by, self.agentId,
- self.productAgentId)
- def is_valid(self):
- if self.by in [UserAuthState.BY.AGENT, UserAuthState.BY.DEVICE]:
- return True
- else:
- return False
- def to_json(self):
- result = {
- 'a': self.agentId,
- 'p': self.productAgentId
- }
- if self.by == UserAuthState.BY.AGENT:
- result.update({
- 'b': 'a'
- })
- else:
- result.update({
- 'b': 'd',
- 'd': self.devNo
- })
- if self.appid:
- result["c"] = self.appid
- result.update({'h': self.href})
- result.update({'i': self.chargeIndex})
- result.update({'u': self.uid})
- return result
- @staticmethod
- def from_json(state_json):
- result = {}
- try:
- result.update({'agentId': state_json['a']})
- result.update({'productAgentId': state_json['p']})
- if state_json['b'] == 'a':
- by = UserAuthState.BY.AGENT
- else:
- by = UserAuthState.BY.DEVICE
- result.update({'devNo': state_json['d']})
- result.update({"appid": state_json.get("c", "")})
- result.update({'href': state_json['h']})
- result.update({'chargeIndex': state_json['i']})
- result.update({'uid': state_json['u']})
- return UserAuthState(by = by, **result)
- except KeyError:
- raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,))
- def encode(self):
- # type:(UserAuthState)->str
- return base64.b64encode(json.dumps(self.to_json()))
- @staticmethod
- def decode(state_str):
- # type:(str)->UserAuthState
- return UserAuthState.from_json(json.loads(base64.b64decode(state_str)))
- @classmethod
- def by_dev(cls, **kwargs):
- return cls(by = cls.BY.DEVICE, **kwargs)
- @classmethod
- def by_agent(cls, **kwargs):
- return cls(by = cls.BY.AGENT, **kwargs)
- class MoniUserAuthState(object):
- """
- 解决STATE参数过长问题,暂时没有其他办法,尽量较少字符串长度
- b <==> by, 取值a(agentId), d(device)
- a <==> agentId
- h <==> href
- d <==> devNo
- i <==> chargeIndex
- u <==> uid
- """
- def __init__(self, **kwargs):
- self.openId = kwargs.get('openId', '')
- self.href = kwargs.get('href', '')
- self.appId = kwargs.get('appId','')
-
- def is_valid(self):
- return True
- def to_json(self):
- result = {}
- result.update({'h': self.href})
- result.update({'o': self.openId})
- result.update({'i':self.appId})
- return result
- @staticmethod
- def from_json(state_json):
- result = {}
- try:
- result.update({'href': state_json['h']})
- result.update({'openId': state_json['o']})
- result.update({'appId': state_json['i']})
- return MoniUserAuthState(**result)
- except KeyError:
- raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,))
- def encode(self):
- # type:(UserAuthState)->str
- return base64.b64encode(json.dumps(self.to_json()))
- @staticmethod
- def decode(state_str):
- # type:(str)->UserAuthState
- return MoniUserAuthState.from_json(json.loads(base64.b64decode(state_str)))
|