123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import logging
- from typing import TYPE_CHECKING, Optional
- from apilib.systypes import IterConstant
- from apilib.utils_url import add_query
- from apps.web.constant import AppPlatformType
- from apps.web.core import WechatMixin
- from apps.web.core.auth.base import AuthBridge
- from library.wechatpy.oauth import WeChatOAuth
- from library.wechatbase.exceptions import WechatOAuthDummyException
- if TYPE_CHECKING:
- from apps.web.core.models import WechatPayApp, WechatAuthApp, WechatManagerApp, WechatUserManagerApp
- logger = logging.getLogger(__name__)
- class WechatAuthScope(IterConstant):
- # 静默跳转,无需用户点击授权
- AUTH_SCOPE_BASE = 'snsapi_base'
- # 需要用户显式点击授权
- AUTH_SCOPE_USER = 'snsapi_userinfo'
- class WechatAuthBridge(AuthBridge, WechatMixin):
- auth_code_key = 'code'
- def __init__(self, app):
- # type: (Optional[WechatAuthApp, WechatManagerApp, WechatUserManagerApp, WechatPayApp])->None
- super(WechatAuthBridge, self).__init__(app)
- self.__gateway_type__ = AppPlatformType.WECHAT
- @property
- def __client__(self):
- return WeChatOAuth(self.appid, self.secret)
- def authorize(self, auth_code):
- logger.debug('{} authorize enter. code = {}'.format(repr(self), auth_code))
- if auth_code is None:
- logger.error('{} fail to authorize because code is null'.format(repr(self)))
- return None
- try:
- openId = self.client.get_oauth_token(auth_code).get('openid')
- logger.debug('{} success to authorize. open id = {}'.format(repr(self), openId))
- return openId
- except Exception as e:
- logger.error('{} fail to authorize because of exception. code = {}'.format(repr(self), auth_code))
- logger.exception(e)
- return None
- def get_user_info(self, auth_code):
- # type:(str)->dict
- auth_token = self.client.get_oauth_token(auth_code)
- logger.debug(
- '{bridge} get auth token. auth_token = {auth_token}'.format(bridge = repr(self), auth_token = auth_token))
- payload = {
- 'scope': auth_token['scope']
- }
- if 'snsapi_userinfo' in auth_token['scope']:
- if 'is_snapshotuser' in auth_token and auth_token['is_snapshotuser'] == 1:
- raise WechatOAuthDummyException()
- response = self.client.get_user_info(
- openid = auth_token['openid'], access_token = auth_token['access_token'])
- logger.debug(
- '{bridge} get user info. user info = {user_info}'.format(bridge = repr(self), user_info = response))
- payload.update({
- 'openId': response.get('openid'),
- 'nickname': response.get('nickname', ''),
- 'province': response.get('province', ''),
- 'city': response.get('city', ''),
- 'country': response.get('country', ''),
- 'avatar': response.get('headimgurl', ''),
- 'sex': int(response.get('sex', 0)),
- 'unionid': response.get('unionid', ''),
- })
- return payload
- else:
- payload.update({
- 'openId': auth_token['openid']
- })
- return payload
- def generate_auth_url(self, redirect_uri, payload = '', scope = WechatAuthScope.AUTH_SCOPE_BASE):
- """
- 生成授权url
- :param redirect_uri:
- :param payload:
- :param scope:
- :return:
- """
- assert redirect_uri is not None
- if payload:
- redirect_uri = add_query(redirect_uri, {'payload': payload})
- return WeChatOAuth(self.appid, self.secret, scope = scope).authorize_url(redirect_uri = redirect_uri)
- def generate_auth_url_base_scope(self, redirect_uri, payload = ''):
- """
- 生成微信跳转url base范围,只能获取openId
- :param redirect_uri:
- :param payload:
- :return:
- """
- logger.debug('%r redirect scope = %s, redirect_uri = %s; payload = %s' % (
- self, WechatAuthScope.AUTH_SCOPE_BASE, redirect_uri, payload))
- return self.generate_auth_url(redirect_uri = redirect_uri,
- payload = payload,
- scope = WechatAuthScope.AUTH_SCOPE_BASE)
- def generate_auth_url_user_scope(self, redirect_uri, payload = ''):
- """
- 生成微信跳转url user范围,只能获取openId
- :param redirect_uri:
- :param payload:
- :return:
- """
- logger.debug('%r redirect scope = %s, redirect_uri = %s; state = %s' % (
- self, WechatAuthScope.AUTH_SCOPE_USER, redirect_uri, payload))
- return self.generate_auth_url(redirect_uri = redirect_uri,
- payload = payload,
- scope = WechatAuthScope.AUTH_SCOPE_USER)
- def jscode2session(self, js_code):
- return self.client.jscode2session(js_code)
|