__init__.py 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import base64
  4. import logging
  5. import simplejson as json
  6. from typing import TYPE_CHECKING
  7. from .errors import CannotParseUserAuthState
  8. logger = logging.getLogger(__name__)
  9. if TYPE_CHECKING:
  10. pass
  11. class Cookies(object):
  12. Recent_GroupId = 'user_group_id'
  13. Recent_DevNo = 'user_dev_no'
  14. class UserAuthState(object):
  15. """
  16. b <==> by, 取值a(agentId), d(device)
  17. a <==> agentId
  18. p <=> productAgentId
  19. h <==> href
  20. d <==> devNo
  21. i <==> chargeIndex
  22. u <==> uid
  23. c <==> application id
  24. """
  25. class BY(object):
  26. AGENT = 'agent'
  27. DEVICE = 'device'
  28. def __init__(self, by, **kwargs):
  29. self.by = by
  30. self.agentId = kwargs.get('agentId', '')
  31. self.productAgentId = kwargs.get('productAgentId', '')
  32. self.devNo = kwargs.get('devNo', '')
  33. self.chargeIndex = kwargs.get('chargeIndex', '')
  34. self.href = kwargs.get('href', '')
  35. self.uid = kwargs.get('uid', '')
  36. self.appid = kwargs.get("appid", "")
  37. def __repr__(self):
  38. return '{}<by = {}, agentId = {}, productAgentId = {}>'.format(self.__class__.__name__, self.by, self.agentId,
  39. self.productAgentId)
  40. def is_valid(self):
  41. if self.by in [UserAuthState.BY.AGENT, UserAuthState.BY.DEVICE]:
  42. return True
  43. else:
  44. return False
  45. def to_json(self):
  46. result = {
  47. 'a': self.agentId,
  48. 'p': self.productAgentId
  49. }
  50. if self.by == UserAuthState.BY.AGENT:
  51. result.update({
  52. 'b': 'a'
  53. })
  54. else:
  55. result.update({
  56. 'b': 'd',
  57. 'd': self.devNo
  58. })
  59. if self.appid:
  60. result["c"] = self.appid
  61. result.update({'h': self.href})
  62. result.update({'i': self.chargeIndex})
  63. result.update({'u': self.uid})
  64. return result
  65. @staticmethod
  66. def from_json(state_json):
  67. result = {}
  68. try:
  69. result.update({'agentId': state_json['a']})
  70. result.update({'productAgentId': state_json['p']})
  71. if state_json['b'] == 'a':
  72. by = UserAuthState.BY.AGENT
  73. else:
  74. by = UserAuthState.BY.DEVICE
  75. result.update({'devNo': state_json['d']})
  76. result.update({"appid": state_json.get("c", "")})
  77. result.update({'href': state_json['h']})
  78. result.update({'chargeIndex': state_json['i']})
  79. result.update({'uid': state_json['u']})
  80. return UserAuthState(by = by, **result)
  81. except KeyError:
  82. raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,))
  83. def encode(self):
  84. # type:(UserAuthState)->str
  85. return base64.b64encode(json.dumps(self.to_json()))
  86. @staticmethod
  87. def decode(state_str):
  88. # type:(str)->UserAuthState
  89. return UserAuthState.from_json(json.loads(base64.b64decode(state_str)))
  90. @classmethod
  91. def by_dev(cls, **kwargs):
  92. return cls(by = cls.BY.DEVICE, **kwargs)
  93. @classmethod
  94. def by_agent(cls, **kwargs):
  95. return cls(by = cls.BY.AGENT, **kwargs)
  96. class MoniUserAuthState(object):
  97. """
  98. 解决STATE参数过长问题,暂时没有其他办法,尽量较少字符串长度
  99. b <==> by, 取值a(agentId), d(device)
  100. a <==> agentId
  101. h <==> href
  102. d <==> devNo
  103. i <==> chargeIndex
  104. u <==> uid
  105. """
  106. def __init__(self, **kwargs):
  107. self.openId = kwargs.get('openId', '')
  108. self.href = kwargs.get('href', '')
  109. self.appId = kwargs.get('appId','')
  110. def is_valid(self):
  111. return True
  112. def to_json(self):
  113. result = {}
  114. result.update({'h': self.href})
  115. result.update({'o': self.openId})
  116. result.update({'i':self.appId})
  117. return result
  118. @staticmethod
  119. def from_json(state_json):
  120. result = {}
  121. try:
  122. result.update({'href': state_json['h']})
  123. result.update({'openId': state_json['o']})
  124. result.update({'appId': state_json['i']})
  125. return MoniUserAuthState(**result)
  126. except KeyError:
  127. raise CannotParseUserAuthState('key not complete state_json=%s' % (state_json,))
  128. def encode(self):
  129. # type:(UserAuthState)->str
  130. return base64.b64encode(json.dumps(self.to_json()))
  131. @staticmethod
  132. def decode(state_str):
  133. # type:(str)->UserAuthState
  134. return MoniUserAuthState.from_json(json.loads(base64.b64decode(state_str)))