# -*- coding: utf-8 -*- # !/usr/bin/env python import logging from typing import TYPE_CHECKING, Optional from apilib.systypes import IterConstant from apilib.utils_url import add_query from apps.web.constant import AppPlatformType from apps.web.core import WechatMixin from apps.web.core.auth.base import AuthBridge from library.wechatpy.oauth import WeChatOAuth from library.wechatbase.exceptions import WechatOAuthDummyException if TYPE_CHECKING: from apps.web.core.models import WechatPayApp, WechatAuthApp, WechatManagerApp, WechatUserManagerApp logger = logging.getLogger(__name__) class WechatAuthScope(IterConstant): # 静默跳转,无需用户点击授权 AUTH_SCOPE_BASE = 'snsapi_base' # 需要用户显式点击授权 AUTH_SCOPE_USER = 'snsapi_userinfo' class WechatAuthBridge(AuthBridge, WechatMixin): auth_code_key = 'code' def __init__(self, app): # type: (Optional[WechatAuthApp, WechatManagerApp, WechatUserManagerApp, WechatPayApp])->None super(WechatAuthBridge, self).__init__(app) self.__gateway_type__ = AppPlatformType.WECHAT @property def __client__(self): return WeChatOAuth(self.appid, self.secret) def authorize(self, auth_code): logger.debug('{} authorize enter. code = {}'.format(repr(self), auth_code)) if auth_code is None: logger.error('{} fail to authorize because code is null'.format(repr(self))) return None try: openId = self.client.get_oauth_token(auth_code).get('openid') logger.debug('{} success to authorize. open id = {}'.format(repr(self), openId)) return openId except Exception as e: logger.error('{} fail to authorize because of exception. code = {}'.format(repr(self), auth_code)) logger.exception(e) return None def get_user_info(self, auth_code): # type:(str)->dict auth_token = self.client.get_oauth_token(auth_code) logger.debug( '{bridge} get auth token. auth_token = {auth_token}'.format(bridge = repr(self), auth_token = auth_token)) payload = { 'scope': auth_token['scope'] } if 'snsapi_userinfo' in auth_token['scope']: if 'is_snapshotuser' in auth_token and auth_token['is_snapshotuser'] == 1: raise WechatOAuthDummyException() response = self.client.get_user_info( openid = auth_token['openid'], access_token = auth_token['access_token']) logger.debug( '{bridge} get user info. user info = {user_info}'.format(bridge = repr(self), user_info = response)) payload.update({ 'openId': response.get('openid'), 'nickname': response.get('nickname', ''), 'province': response.get('province', ''), 'city': response.get('city', ''), 'country': response.get('country', ''), 'avatar': response.get('headimgurl', ''), 'sex': int(response.get('sex', 0)), 'unionid': response.get('unionid', ''), }) return payload else: payload.update({ 'openId': auth_token['openid'] }) return payload def generate_auth_url(self, redirect_uri, payload = '', scope = WechatAuthScope.AUTH_SCOPE_BASE): """ 生成授权url :param redirect_uri: :param payload: :param scope: :return: """ assert redirect_uri is not None if payload: redirect_uri = add_query(redirect_uri, {'payload': payload}) return WeChatOAuth(self.appid, self.secret, scope = scope).authorize_url(redirect_uri = redirect_uri) def generate_auth_url_base_scope(self, redirect_uri, payload = ''): """ 生成微信跳转url base范围,只能获取openId :param redirect_uri: :param payload: :return: """ logger.debug('%r redirect scope = %s, redirect_uri = %s; payload = %s' % ( self, WechatAuthScope.AUTH_SCOPE_BASE, redirect_uri, payload)) return self.generate_auth_url(redirect_uri = redirect_uri, payload = payload, scope = WechatAuthScope.AUTH_SCOPE_BASE) def generate_auth_url_user_scope(self, redirect_uri, payload = ''): """ 生成微信跳转url user范围,只能获取openId :param redirect_uri: :param payload: :return: """ logger.debug('%r redirect scope = %s, redirect_uri = %s; state = %s' % ( self, WechatAuthScope.AUTH_SCOPE_USER, redirect_uri, payload)) return self.generate_auth_url(redirect_uri = redirect_uri, payload = payload, scope = WechatAuthScope.AUTH_SCOPE_USER) def jscode2session(self, js_code): return self.client.jscode2session(js_code)