# -*- coding: utf-8 -*- # !/usr/bin/env python """ wechatpy.component ~~~~~~~~~~~~~~~ This module provides client library for WeChat Open Platform :copyright: (c) 2015 by hunter007. :license: MIT, see LICENSE for more details. """ import json import logging import time from urllib import quote import requests import xmltodict from library import to_text, my_memcache_lock from library.wechatpy import access_token_key, component_verify_ticket_key, refresh_token_key from library.wechatpy.client import WeChatComponentClient from library.wechatpy.constants import WeChatErrorCode from library.wechatpy.crypto import WeChatCrypto from library.wechatbase.exceptions import ( APILimitedException, WeChatException, WeChatComponentOAuthException, WeChatOAuthException, WechatNetworkException ) from library.wechatpy.messages import COMPONENT_MESSAGE_TYPES, ComponentUnknownMessage from library.wechatpy.parser import parse_message logger = logging.getLogger(__name__) NO_RETRY_ERRCODE = [ '48001', # api unauthorized '40164' # invalid ip not in whitelist '61004' # access clientip is not registered request ] class BaseWeChatComponent(object): API_BASE_URL = "https://api.weixin.qq.com/cgi-bin" def __init__( self, component_appid, component_appsecret, component_token, encoding_aes_key, lock_cache, session, authorizer, auto_retry=True, ): """ :param component_appid: 第三方平台appid :param component_appsecret: 第三方平台appsecret :param component_token: 公众号消息校验Token :param encoding_aes_key: 公众号消息加解密Key """ self._http = requests.Session() self.component_appid = component_appid self.component_appsecret = component_appsecret self.crypto = WeChatCrypto(component_token, encoding_aes_key, component_appid) self.session = session self.authorizer = authorizer self.lock_cache = lock_cache self.auto_retry = auto_retry @property def component_verify_ticket(self): return self.session.get(component_verify_ticket_key(self.component_appid)) def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(("http://", "https://")): api_base_url = kwargs.pop("api_base_url", self.API_BASE_URL) url = "{}{}".format(api_base_url, url_or_endpoint) else: url = url_or_endpoint if "params" not in kwargs: kwargs["params"] = {} if isinstance(kwargs["params"], dict) and "component_access_token" not in kwargs["params"]: kwargs["params"]["component_access_token"] = self.access_token if isinstance(kwargs["data"], dict): kwargs["data"] = json.dumps(kwargs["data"]) res = self._http.request(method = method, url = url, **kwargs) try: res.raise_for_status() except requests.RequestException as reqe: raise WechatNetworkException( errCode = 'HTTP{}'.format(res.status_code), errMsg = reqe.message, client = self, request = reqe.request, response = reqe.response) return self._handle_result(res, method, url, **kwargs) def _handle_result(self, res, method=None, url=None, **kwargs): result = json.loads(res.content.decode("utf-8", "ignore"), strict=False) if "errcode" in result: result["errcode"] = int(result["errcode"]) if "errcode" in result and result["errcode"] != 0: errcode = result["errcode"] errmsg = result.get("errmsg", errcode) if self.auto_retry and errcode in ( WeChatErrorCode.INVALID_CREDENTIAL.value, WeChatErrorCode.INVALID_ACCESS_TOKEN.value, WeChatErrorCode.EXPIRED_ACCESS_TOKEN.value, ): logger.info("Component access token expired, fetch a new one and retry request") self.fetch_access_token() kwargs["params"]["component_access_token"] = self.session.get( "{}_component_access_token".format(self.component_appid) ) return self._request(method=method, url_or_endpoint=url, **kwargs) elif errcode == WeChatErrorCode.OUT_OF_API_FREQ_LIMIT.value: # api freq out of limit raise APILimitedException(errcode, errmsg, client=self, request=res.request, response=res) else: raise WeChatException(errcode, errmsg, client=self, request=res.request, response=res) return result @property def component_access_token_key(self): return '{}_component_access_token'.format(self.component_appid) def fetch_access_token(self, old_token=None): """ 获取 component_access_token 详情请参考 https://open.weixin.qq.com/cgi-bin/showdocument?action=dir_list\ &t=resource/res_list&verify=1&id=open1419318587&token=&lang=zh_CN :return: 返回的 JSON 数据包 """ key = 'component-access-token-lock-{appid}'.format(appid=self.component_appid) retry = 0 while True: current_token = self.session.get(self.component_access_token_key) if current_token and current_token != old_token: logger.debug( '=== WechatToken === app fetched one other component access token. access token = {}'.format( self.component_appid, current_token)) return current_token with my_memcache_lock(self.lock_cache, key, '1', expire=15) as acquired: if acquired: try: new_token = self.refresh_access_token() if new_token: logger.debug( '=== WechatToken === app fetch component access token success. access token = {}'.format( self.component_appid, new_token)) return new_token else: raise Exception( '=== WechatToken === app fetch component access token is null.'.format( self.component_appid)) except APILimitedException as e: logger.error(repr(e)) raise e except WeChatException as e: logger.exception(e) if str(e.errCode) in NO_RETRY_ERRCODE: raise e except Exception as e: logger.exception(e) else: logger.debug( '=== WechatToken === app not acquire component access token memcache key<{}>'.format( self.component_appid, key)) retry = retry + 1 if retry >= 3: raise WeChatException( errCode=WeChatErrorCode.MY_SYSTEM_ERROR, errMsg='=== WechatToken === app fetch component access token timeout.'.format( self.component_appid), client=self) time.sleep(5) def refresh_access_token(self): logger.info("Fetching component access token") url = "{}/component/api_component_token".format(self.API_BASE_URL) data = json.dumps( { "component_appid": self.component_appid, "component_appsecret": self.component_appsecret, "component_verify_ticket": self.component_verify_ticket, } ) res = self._http.post(url=url, data=data) try: res.raise_for_status() except requests.RequestException as reqe: raise WechatNetworkException( errCode = 'HTTP{}'.format(res.status_code), errMsg = reqe.message, client = self, request = reqe.request, response = reqe.response) result = res.json() if "errcode" in result and result["errcode"] != 0: raise WeChatException( result["errcode"], result["errmsg"], client=self, request=res.request, response=res, ) expires_in = 7200 - 600 if 'expires_in' in result: expires_in = result['expires_in'] if expires_in < 600: expires_in = expires_in / 2 else: expires_in = expires_in - 600 self.session.set( self.component_access_token_key, result['component_access_token'], expires_in ) return result @property def access_token(self): """ WeChat access token """ access_token = self.session.get(self.component_access_token_key) if access_token: return access_token else: return self.fetch_access_token() def get(self, url, **kwargs): return self._request(method="get", url_or_endpoint=url, **kwargs) def post(self, url, **kwargs): return self._request(method="post", url_or_endpoint=url, **kwargs) class WeChatComponent(BaseWeChatComponent): def get_pre_auth_url(self, redirect_uri): """ 获取PC版授权链接 """ redirect_uri = quote(redirect_uri, safe=b"") url_template = 'https://mp.weixin.qq.com/cgi-bin/componentloginpage?component_appid={}&pre_auth_code={}&redirect_uri={}&auth_type=' return url_template.format(self.component_appid, self.create_preauthcode()['pre_auth_code'], redirect_uri) def get_pre_auth_url_m(self, redirect_uri): """ 获取H5版授权链接 """ redirect_uri = quote(redirect_uri, safe="") url_template = 'https://open.weixin.qq.com/wxaopen/safe/bindcomponent?action=bindcomponent&no_scan=1&component_appid={}&pre_auth_code={}&redirect_uri={}&auth_type=3#wechat_redirect' return url_template.format(self.component_appid, self.create_preauthcode()['pre_auth_code'], redirect_uri) def create_preauthcode(self): """ 获取预授权码 """ return self.post( "/component/api_create_preauthcode", data={"component_appid": self.component_appid}, ) def query_auth(self, authorization_code): """ 使用授权码换取公众号的授权信息 :params authorization_code: 授权code,会在授权成功时返回给第三方平台,详见第三方平台授权流程说明 """ result = self.post( "/component/api_query_auth", data={ "component_appid": self.component_appid, "authorization_code": authorization_code, } ) assert (result is not None and "authorization_info" in result and "authorizer_appid" in result["authorization_info"]) return result def refresh_token(self, appid): refresh_token = self.session.get(refresh_token_key(appid)) if not refresh_token: authorizer = self.authorizer.getAuthRecord(appid) if authorizer: refresh_token = authorizer.refreshToken self.session.set(refresh_token_key(appid), refresh_token, 7 * 24 * 3600) return refresh_token def refresh_authorizer_token(self, authorizer_appid): """ 获取(刷新)授权公众号的令牌 :params authorizer_appid: 授权方appid :params authorizer_refresh_token: 授权方的刷新令牌 """ return self.post( "/component/api_authorizer_token", data={ "component_appid": self.component_appid, "authorizer_appid": authorizer_appid, "authorizer_refresh_token": self.refresh_token(authorizer_appid), }, ) def get_authorizer_info(self, authorizer_appid): """ 获取授权方的账户信息 :params authorizer_appid: 授权方appid """ return self.post( "/component/api_get_authorizer_info", data={ "component_appid": self.component_appid, "authorizer_appid": authorizer_appid, }, ) def get_authorizer_list(self, offset=0, count=500): """ 拉取所有已授权的帐号信息 :params offset: 偏移位置/起始位置 :params count: 拉取数量 """ return self.post( "/component/api_get_authorizer_list", data={ "component_appid": self.component_appid, "offset": offset, "count": count, }, ) def get_authorizer_option(self, authorizer_appid, option_name): """ 获取授权方的选项设置信息 :params authorizer_appid: 授权公众号appid :params option_name: 选项名称 """ return self.post( "/component/api_get_authorizer_option", data={ "component_appid": self.component_appid, "authorizer_appid": authorizer_appid, "option_name": option_name, }, ) def set_authorizer_option(self, authorizer_appid, option_name, option_value): """ 设置授权方的选项信息 :params authorizer_appid: 授权公众号appid :params option_name: 选项名称 :params option_value: 设置的选项值 """ return self.post( "/component/api_set_authorizer_option", data={ "component_appid": self.component_appid, "authorizer_appid": authorizer_appid, "option_name": option_name, "option_value": option_value, }, ) def get_client_by_appid(self, authorizer_appid): """ 通过 authorizer_appid 获取 Client 对象 :params authorizer_appid: 授权公众号appid """ access_token_key = "{}_access_token".format(authorizer_appid) access_token = self.session.get(access_token_key) if not access_token: ret = self.refresh_authorizer_token(authorizer_appid) access_token = ret["authorizer_access_token"] access_token_key = "{}_access_token".format(authorizer_appid) expires_in = 7200 if "expires_in" in ret: expires_in = ret["expires_in"] self.session.set(access_token_key, access_token, expires_in) return WeChatComponentClient(authorizer_appid, self, session=self.session) def do_auth(self, auth_code): # 获取auth信息(authorizer_access_token, authorizer_refresh_token) auth_info = self.query_auth(auth_code)["authorization_info"] authorizer_appid = auth_info['authorizer_appid'] authorizer_access_token = auth_info.get('authorizer_access_token', None) if authorizer_access_token: expires_in = 7200 if "expires_in" in auth_info: expires_in = auth_info["expires_in"] self.session.set(access_token_key(authorizer_appid), authorizer_access_token, (expires_in - 600)) payload = { 'appid': authorizer_appid, } authorizer_refresh_token = auth_info.get('authorizer_refresh_token', None) if authorizer_refresh_token: payload.update({'refreshToken': authorizer_refresh_token}) self.session.set(refresh_token_key(authorizer_appid), authorizer_refresh_token, 7 * 24 * 3600) # 获取公众号或者小程序信息 app_info = self.get_authorizer_info(authorizer_appid) authorizer_info = app_info['authorizer_info'] payload.update({ 'nickName': authorizer_info.pop('nick_name'), 'headImg': authorizer_info.pop('head_img'), 'userName': authorizer_info.pop('user_name'), 'principalName': authorizer_info.pop('principal_name'), 'qrcodeUrl': authorizer_info.pop('qrcode_url'), 'verifyInfo': authorizer_info.pop('verify_type_info')['id'], 'serviceType': authorizer_info.pop('service_type_info')['id'], 'appStatus': authorizer_info.pop('account_status') }) payload['extra'] = authorizer_info if 'MiniProgramInfo' in authorizer_info: # 小程序 payload['appType'] = 0 else: payload['appType'] = 1 funcList = [] func_info = app_info['authorization_info']['func_info'] for func in func_info: _id = func['funcscope_category']['id'] funcList.append(_id) payload['funcList'] = funcList self.authorizer.createOrUpdateAuthRecord(payload) def do_un_auth(self, authorizer_appid): self.authorizer.deleteAuthRecord(authorizer_appid) def parse_message(self, msg, msg_signature, timestamp, nonce): """ 处理 wechat server 推送消息 :params msg: 加密内容 :params msg_signature: 消息签名 :params timestamp: 时间戳 :params nonce: 随机数 """ content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce) message = xmltodict.parse(to_text(content))["xml"] message_type = message["InfoType"].lower() message_class = COMPONENT_MESSAGE_TYPES.get(message_type, ComponentUnknownMessage) msg = message_class(message) if msg.type == "component_verify_ticket": self.session.set(component_verify_ticket_key(self.component_appid), msg.verify_ticket, 7 * 24 * 3600) elif msg.type in ("authorized", "updateauthorized"): self.do_auth(msg.authorization_code) elif msg.type == 'unauthorized': self.do_un_auth(msg.authorizer_appid) return msg def parse_authorizer_message(self, msg, msg_signature, timestamp, nonce): content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce) return parse_message(content) def get_component_oauth(self, authorizer_appid): """ 代公众号 OAuth 网页授权 :params authorizer_appid: 授权公众号appid """ return ComponentOAuth(self, authorizer_appid) class ComponentOAuth(object): """微信开放平台 代公众号 OAuth 网页授权 详情请参考 https://open.weixin.qq.com/cgi-bin/showdocument?action=dir_list&t=resource/res_list&verify=1&id=open1419318590 """ API_BASE_URL = "https://api.weixin.qq.com/" OAUTH_BASE_URL = "https://open.weixin.qq.com/connect/" def __init__(self, component, app_id): """ :param component: WeChatComponent :param app_id: 微信公众号 app_id """ self._http = requests.Session() self.app_id = app_id self.component = component def get_authorize_url(self, redirect_uri, scope="snsapi_base", state=""): """ :param redirect_uri: 重定向地址,需要urlencode,这里填写的应是服务开发方的回调地址 :param scope: 可选,微信公众号 OAuth2 scope,默认为 ``snsapi_base`` :param state: 可选,重定向后会带上state参数,开发者可以填写任意参数值,最多128字节 """ redirect_uri = quote(redirect_uri, safe=b"") url_list = [ self.OAUTH_BASE_URL, "oauth2/authorize?appid=", self.app_id, "&redirect_uri=", redirect_uri, "&response_type=code&scope=", scope, ] if state: url_list.extend(["&state=", state]) url_list.extend( [ "&component_appid=", self.component.component_appid, ] ) url_list.append("#wechat_redirect") return "".join(url_list) def fetch_access_token(self, code): """获取 access_token :param code: 授权完成跳转回来后 URL 中的 code 参数 :return: JSON 数据包 """ res = self._get( "sns/oauth2/component/access_token", params={ "appid": self.app_id, "component_appid": self.component.component_appid, "component_access_token": self.component.access_token, "code": code, "grant_type": "authorization_code", }, ) self.access_token = res["access_token"] self.open_id = res["openid"] self.refresh_token = res["refresh_token"] self.expires_in = res["expires_in"] self.scope = res["scope"] return res def refresh_access_token(self, refresh_token): """刷新 access token :param refresh_token: OAuth2 refresh token :return: JSON 数据包 """ res = self._get( "sns/oauth2/component/refresh_token", params={ "appid": self.app_id, "grant_type": "refresh_token", "refresh_token": refresh_token, "component_appid": self.component.component_appid, "component_access_token": self.component.access_token, }, ) self.access_token = res["access_token"] self.open_id = res["openid"] self.refresh_token = res["refresh_token"] self.expires_in = res["expires_in"] self.scope = res["scope"] return res def get_user_info(self, openid=None, access_token=None, lang="zh_CN"): """获取用户基本信息(需授权作用域为snsapi_userinfo) 如果网页授权作用域为snsapi_userinfo,则此时开发者可以通过access_token和openid拉取用户信息了。 :param openid: 可选,微信 openid,默认获取当前授权用户信息 :param access_token: 可选,access_token,默认使用当前授权用户的 access_token :param lang: 可选,语言偏好, 默认为 ``zh_CN`` :return: JSON 数据包 """ openid = openid or self.open_id access_token = access_token or self.access_token return self._get( "sns/userinfo", params={"access_token": access_token, "openid": openid, "lang": lang}, ) def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(("http://", "https://")): url = "{}{}".format(self.API_BASE_URL, url_or_endpoint) else: url = url_or_endpoint if isinstance(kwargs.get("data", ""), dict): body = json.dumps(kwargs["data"], ensure_ascii=False) body = body.encode("utf-8") kwargs["data"] = body res = self._http.request(method=method, url=url, **kwargs) try: res.raise_for_status() except requests.RequestException as reqe: raise WeChatOAuthException( errCode=None, errMsg=None, client=self, request=reqe.request, response=reqe.response, ) return self._handle_result(res, method=method, url=url, **kwargs) def _handle_result(self, res, method=None, url=None, **kwargs): result = json.loads(res.content.decode("utf-8", "ignore"), strict=False) if "errcode" in result: result["errcode"] = int(result["errcode"]) if "errcode" in result and result["errcode"] != 0: errcode = result["errcode"] errmsg = result.get("errmsg", errcode) if self.component.auto_retry and errcode in ( WeChatErrorCode.INVALID_CREDENTIAL.value, WeChatErrorCode.INVALID_ACCESS_TOKEN.value, WeChatErrorCode.EXPIRED_ACCESS_TOKEN.value, ): logger.info("Component access token expired, fetch a new one and retry request") self.component.fetch_access_token() kwargs["params"]["component_access_token"] = self.component.access_token return self._request(method=method, url_or_endpoint=url, **kwargs) elif errcode == WeChatErrorCode.OUT_OF_API_FREQ_LIMIT.value: # api freq out of limit raise APILimitedException(errcode, errmsg, client=self, request=res.request, response=res) else: raise WeChatComponentOAuthException(errcode, errmsg, client=self, request=res.request, response=res) return result def _get(self, url, **kwargs): return self._request(method="get", url_or_endpoint=url, **kwargs)