|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- """
- wechatpy.component
- ~~~~~~~~~~~~~~~
- This module provides client library for WeChat Open Platform
- :copyright: (c) 2015 by hunter007.
- :license: MIT, see LICENSE for more details.
- """
- import json
- import logging
- import time
- from urllib import quote
- import requests
- import xmltodict
- from library import to_text, my_memcache_lock
- from library.wechatpy import access_token_key, component_verify_ticket_key, refresh_token_key
- from library.wechatpy.client import WeChatComponentClient
- from library.wechatpy.constants import WeChatErrorCode
- from library.wechatpy.crypto import WeChatCrypto
- from library.wechatbase.exceptions import (
- APILimitedException,
- WeChatException,
- WeChatComponentOAuthException,
- WeChatOAuthException, WechatNetworkException
- )
- from library.wechatpy.messages import COMPONENT_MESSAGE_TYPES, ComponentUnknownMessage
- from library.wechatpy.parser import parse_message
- logger = logging.getLogger(__name__)
- NO_RETRY_ERRCODE = [
- '48001', # api unauthorized
- '40164' # invalid ip not in whitelist
- '61004' # access clientip is not registered request
- ]
- class BaseWeChatComponent(object):
- API_BASE_URL = "https://api.weixin.qq.com/cgi-bin"
- def __init__(
- self,
- component_appid,
- component_appsecret,
- component_token,
- encoding_aes_key,
- lock_cache,
- session,
- authorizer,
- auto_retry=True,
- ):
- """
- :param component_appid: 第三方平台appid
- :param component_appsecret: 第三方平台appsecret
- :param component_token: 公众号消息校验Token
- :param encoding_aes_key: 公众号消息加解密Key
- """
- self._http = requests.Session()
- self.component_appid = component_appid
- self.component_appsecret = component_appsecret
- self.crypto = WeChatCrypto(component_token, encoding_aes_key, component_appid)
- self.session = session
- self.authorizer = authorizer
- self.lock_cache = lock_cache
- self.auto_retry = auto_retry
- @property
- def component_verify_ticket(self):
- return self.session.get(component_verify_ticket_key(self.component_appid))
- def _request(self, method, url_or_endpoint, **kwargs):
- if not url_or_endpoint.startswith(("http://", "https://")):
- api_base_url = kwargs.pop("api_base_url", self.API_BASE_URL)
- url = "{}{}".format(api_base_url, url_or_endpoint)
- else:
- url = url_or_endpoint
- if "params" not in kwargs:
- kwargs["params"] = {}
- if isinstance(kwargs["params"], dict) and "component_access_token" not in kwargs["params"]:
- kwargs["params"]["component_access_token"] = self.access_token
- if isinstance(kwargs["data"], dict):
- kwargs["data"] = json.dumps(kwargs["data"])
- res = self._http.request(method = method, url = url, **kwargs)
- try:
- res.raise_for_status()
- except requests.RequestException as reqe:
- raise WechatNetworkException(
- errCode = 'HTTP{}'.format(res.status_code),
- errMsg = reqe.message,
- client = self,
- request = reqe.request,
- response = reqe.response)
- return self._handle_result(res, method, url, **kwargs)
- def _handle_result(self, res, method=None, url=None, **kwargs):
- result = json.loads(res.content.decode("utf-8", "ignore"), strict=False)
- if "errcode" in result:
- result["errcode"] = int(result["errcode"])
- if "errcode" in result and result["errcode"] != 0:
- errcode = result["errcode"]
- errmsg = result.get("errmsg", errcode)
- if self.auto_retry and errcode in (
- WeChatErrorCode.INVALID_CREDENTIAL.value,
- WeChatErrorCode.INVALID_ACCESS_TOKEN.value,
- WeChatErrorCode.EXPIRED_ACCESS_TOKEN.value,
- ):
- logger.info("Component access token expired, fetch a new one and retry request")
- self.fetch_access_token()
- kwargs["params"]["component_access_token"] = self.session.get(
- "{}_component_access_token".format(self.component_appid)
- )
- return self._request(method=method, url_or_endpoint=url, **kwargs)
- elif errcode == WeChatErrorCode.OUT_OF_API_FREQ_LIMIT.value:
- # api freq out of limit
- raise APILimitedException(errcode, errmsg, client=self, request=res.request, response=res)
- else:
- raise WeChatException(errcode, errmsg, client=self, request=res.request, response=res)
- return result
- @property
- def component_access_token_key(self):
- return '{}_component_access_token'.format(self.component_appid)
- def fetch_access_token(self, old_token=None):
- """
- 获取 component_access_token
- 详情请参考 https://open.weixin.qq.com/cgi-bin/showdocument?action=dir_list\
- &t=resource/res_list&verify=1&id=open1419318587&token=&lang=zh_CN
- :return: 返回的 JSON 数据包
- """
- key = 'component-access-token-lock-{appid}'.format(appid=self.component_appid)
- retry = 0
- while True:
- current_token = self.session.get(self.component_access_token_key)
- if current_token and current_token != old_token:
- logger.debug(
- '=== WechatToken === app<id={}> fetched one other component access token. access token = {}'.format(
- self.component_appid, current_token))
- return current_token
- with my_memcache_lock(self.lock_cache, key, '1', expire=15) as acquired:
- if acquired:
- try:
- new_token = self.refresh_access_token()
- if new_token:
- logger.debug(
- '=== WechatToken === app<id={}> fetch component access token success. access token = {}'.format(
- self.component_appid, new_token))
- return new_token
- else:
- raise Exception(
- '=== WechatToken === app<id={}> fetch component access token is null.'.format(
- self.component_appid))
- except APILimitedException as e:
- logger.error(repr(e))
- raise e
- except WeChatException as e:
- logger.exception(e)
- if str(e.errCode) in NO_RETRY_ERRCODE:
- raise e
- except Exception as e:
- logger.exception(e)
- else:
- logger.debug(
- '=== WechatToken === app<id={}> not acquire component access token memcache key<{}>'.format(
- self.component_appid, key))
- retry = retry + 1
- if retry >= 3:
- raise WeChatException(
- errCode=WeChatErrorCode.MY_SYSTEM_ERROR,
- errMsg='=== WechatToken === app<id={}> fetch component access token timeout.'.format(
- self.component_appid),
- client=self)
- time.sleep(5)
- def refresh_access_token(self):
- logger.info("Fetching component access token")
- url = "{}/component/api_component_token".format(self.API_BASE_URL)
- data = json.dumps(
- {
- "component_appid": self.component_appid,
- "component_appsecret": self.component_appsecret,
- "component_verify_ticket": self.component_verify_ticket,
- }
- )
- res = self._http.post(url=url, data=data)
- try:
- res.raise_for_status()
- except requests.RequestException as reqe:
- raise WechatNetworkException(
- errCode = 'HTTP{}'.format(res.status_code),
- errMsg = reqe.message,
- client = self,
- request = reqe.request,
- response = reqe.response)
- result = res.json()
- if "errcode" in result and result["errcode"] != 0:
- raise WeChatException(
- result["errcode"],
- result["errmsg"],
- client=self,
- request=res.request,
- response=res,
- )
- expires_in = 7200 - 600
- if 'expires_in' in result:
- expires_in = result['expires_in']
- if expires_in < 600:
- expires_in = expires_in / 2
- else:
- expires_in = expires_in - 600
- self.session.set(
- self.component_access_token_key,
- result['component_access_token'],
- expires_in
- )
- return result
- @property
- def access_token(self):
- """ WeChat access token """
- access_token = self.session.get(self.component_access_token_key)
- if access_token:
- return access_token
- else:
- return self.fetch_access_token()
- def get(self, url, **kwargs):
- return self._request(method="get", url_or_endpoint=url, **kwargs)
- def post(self, url, **kwargs):
- return self._request(method="post", url_or_endpoint=url, **kwargs)
- class WeChatComponent(BaseWeChatComponent):
- def get_pre_auth_url(self, redirect_uri):
- """
- 获取PC版授权链接
- """
- redirect_uri = quote(redirect_uri, safe=b"")
- url_template = 'https://mp.weixin.qq.com/cgi-bin/componentloginpage?component_appid={}&pre_auth_code={}&redirect_uri={}&auth_type='
- return url_template.format(self.component_appid, self.create_preauthcode()['pre_auth_code'], redirect_uri)
- def get_pre_auth_url_m(self, redirect_uri):
- """
- 获取H5版授权链接
- """
- redirect_uri = quote(redirect_uri, safe="")
- url_template = 'https://open.weixin.qq.com/wxaopen/safe/bindcomponent?action=bindcomponent&no_scan=1&component_appid={}&pre_auth_code={}&redirect_uri={}&auth_type=3#wechat_redirect'
- return url_template.format(self.component_appid, self.create_preauthcode()['pre_auth_code'], redirect_uri)
- def create_preauthcode(self):
- """
- 获取预授权码
- """
- return self.post(
- "/component/api_create_preauthcode",
- data={"component_appid": self.component_appid},
- )
- def query_auth(self, authorization_code):
- """
- 使用授权码换取公众号的授权信息
- :params authorization_code: 授权code,会在授权成功时返回给第三方平台,详见第三方平台授权流程说明
- """
- result = self.post(
- "/component/api_query_auth",
- data={
- "component_appid": self.component_appid,
- "authorization_code": authorization_code,
- }
- )
- assert (result is not None and
- "authorization_info" in result and
- "authorizer_appid" in result["authorization_info"])
- return result
- def refresh_token(self, appid):
- refresh_token = self.session.get(refresh_token_key(appid))
- if not refresh_token:
- authorizer = self.authorizer.getAuthRecord(appid)
- if authorizer:
- refresh_token = authorizer.refreshToken
- self.session.set(refresh_token_key(appid), refresh_token, 7 * 24 * 3600)
- return refresh_token
- def refresh_authorizer_token(self, authorizer_appid):
- """
- 获取(刷新)授权公众号的令牌
- :params authorizer_appid: 授权方appid
- :params authorizer_refresh_token: 授权方的刷新令牌
- """
- return self.post(
- "/component/api_authorizer_token",
- data={
- "component_appid": self.component_appid,
- "authorizer_appid": authorizer_appid,
- "authorizer_refresh_token": self.refresh_token(authorizer_appid),
- },
- )
- def get_authorizer_info(self, authorizer_appid):
- """
- 获取授权方的账户信息
- :params authorizer_appid: 授权方appid
- """
- return self.post(
- "/component/api_get_authorizer_info",
- data={
- "component_appid": self.component_appid,
- "authorizer_appid": authorizer_appid,
- },
- )
- def get_authorizer_list(self, offset=0, count=500):
- """
- 拉取所有已授权的帐号信息
- :params offset: 偏移位置/起始位置
- :params count: 拉取数量
- """
- return self.post(
- "/component/api_get_authorizer_list",
- data={
- "component_appid": self.component_appid,
- "offset": offset,
- "count": count,
- },
- )
- def get_authorizer_option(self, authorizer_appid, option_name):
- """
- 获取授权方的选项设置信息
- :params authorizer_appid: 授权公众号appid
- :params option_name: 选项名称
- """
- return self.post(
- "/component/api_get_authorizer_option",
- data={
- "component_appid": self.component_appid,
- "authorizer_appid": authorizer_appid,
- "option_name": option_name,
- },
- )
- def set_authorizer_option(self, authorizer_appid, option_name, option_value):
- """
- 设置授权方的选项信息
- :params authorizer_appid: 授权公众号appid
- :params option_name: 选项名称
- :params option_value: 设置的选项值
- """
- return self.post(
- "/component/api_set_authorizer_option",
- data={
- "component_appid": self.component_appid,
- "authorizer_appid": authorizer_appid,
- "option_name": option_name,
- "option_value": option_value,
- },
- )
- def get_client_by_appid(self, authorizer_appid):
- """
- 通过 authorizer_appid 获取 Client 对象
- :params authorizer_appid: 授权公众号appid
- """
- access_token_key = "{}_access_token".format(authorizer_appid)
- access_token = self.session.get(access_token_key)
- if not access_token:
- ret = self.refresh_authorizer_token(authorizer_appid)
- access_token = ret["authorizer_access_token"]
- access_token_key = "{}_access_token".format(authorizer_appid)
- expires_in = 7200
- if "expires_in" in ret:
- expires_in = ret["expires_in"]
- self.session.set(access_token_key, access_token, expires_in)
- return WeChatComponentClient(authorizer_appid, self, session=self.session)
- def do_auth(self, auth_code):
- # 获取auth信息(authorizer_access_token, authorizer_refresh_token)
- auth_info = self.query_auth(auth_code)["authorization_info"]
- authorizer_appid = auth_info['authorizer_appid']
- authorizer_access_token = auth_info.get('authorizer_access_token', None)
- if authorizer_access_token:
- expires_in = 7200
- if "expires_in" in auth_info:
- expires_in = auth_info["expires_in"]
- self.session.set(access_token_key(authorizer_appid), authorizer_access_token, (expires_in - 600))
- payload = {
- 'appid': authorizer_appid,
- }
- authorizer_refresh_token = auth_info.get('authorizer_refresh_token', None)
- if authorizer_refresh_token:
- payload.update({'refreshToken': authorizer_refresh_token})
- self.session.set(refresh_token_key(authorizer_appid), authorizer_refresh_token, 7 * 24 * 3600)
- # 获取公众号或者小程序信息
- app_info = self.get_authorizer_info(authorizer_appid)
- authorizer_info = app_info['authorizer_info']
- payload.update({
- 'nickName': authorizer_info.pop('nick_name'),
- 'headImg': authorizer_info.pop('head_img'),
- 'userName': authorizer_info.pop('user_name'),
- 'principalName': authorizer_info.pop('principal_name'),
- 'qrcodeUrl': authorizer_info.pop('qrcode_url'),
- 'verifyInfo': authorizer_info.pop('verify_type_info')['id'],
- 'serviceType': authorizer_info.pop('service_type_info')['id'],
- 'appStatus': authorizer_info.pop('account_status')
- })
- payload['extra'] = authorizer_info
- if 'MiniProgramInfo' in authorizer_info: # 小程序
- payload['appType'] = 0
- else:
- payload['appType'] = 1
- funcList = []
- func_info = app_info['authorization_info']['func_info']
- for func in func_info:
- _id = func['funcscope_category']['id']
- funcList.append(_id)
- payload['funcList'] = funcList
- self.authorizer.createOrUpdateAuthRecord(payload)
- def do_un_auth(self, authorizer_appid):
- self.authorizer.deleteAuthRecord(authorizer_appid)
- def parse_message(self, msg, msg_signature, timestamp, nonce):
- """
- 处理 wechat server 推送消息
- :params msg: 加密内容
- :params msg_signature: 消息签名
- :params timestamp: 时间戳
- :params nonce: 随机数
- """
- content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce)
- message = xmltodict.parse(to_text(content))["xml"]
- message_type = message["InfoType"].lower()
- message_class = COMPONENT_MESSAGE_TYPES.get(message_type, ComponentUnknownMessage)
- msg = message_class(message)
- if msg.type == "component_verify_ticket":
- self.session.set(component_verify_ticket_key(self.component_appid), msg.verify_ticket, 7 * 24 * 3600)
- elif msg.type in ("authorized", "updateauthorized"):
- self.do_auth(msg.authorization_code)
- elif msg.type == 'unauthorized':
- self.do_un_auth(msg.authorizer_appid)
- return msg
- def parse_authorizer_message(self, msg, msg_signature, timestamp, nonce):
- content = self.crypto.decrypt_message(msg, msg_signature, timestamp, nonce)
- return parse_message(content)
- def get_component_oauth(self, authorizer_appid):
- """
- 代公众号 OAuth 网页授权
- :params authorizer_appid: 授权公众号appid
- """
- return ComponentOAuth(self, authorizer_appid)
- class ComponentOAuth(object):
- """微信开放平台 代公众号 OAuth 网页授权
- 详情请参考
- https://open.weixin.qq.com/cgi-bin/showdocument?action=dir_list&t=resource/res_list&verify=1&id=open1419318590
- """
- API_BASE_URL = "https://api.weixin.qq.com/"
- OAUTH_BASE_URL = "https://open.weixin.qq.com/connect/"
- def __init__(self, component, app_id):
- """
- :param component: WeChatComponent
- :param app_id: 微信公众号 app_id
- """
- self._http = requests.Session()
- self.app_id = app_id
- self.component = component
- def get_authorize_url(self, redirect_uri, scope="snsapi_base", state=""):
- """
- :param redirect_uri: 重定向地址,需要urlencode,这里填写的应是服务开发方的回调地址
- :param scope: 可选,微信公众号 OAuth2 scope,默认为 ``snsapi_base``
- :param state: 可选,重定向后会带上state参数,开发者可以填写任意参数值,最多128字节
- """
- redirect_uri = quote(redirect_uri, safe=b"")
- url_list = [
- self.OAUTH_BASE_URL,
- "oauth2/authorize?appid=",
- self.app_id,
- "&redirect_uri=",
- redirect_uri,
- "&response_type=code&scope=",
- scope,
- ]
- if state:
- url_list.extend(["&state=", state])
- url_list.extend(
- [
- "&component_appid=",
- self.component.component_appid,
- ]
- )
- url_list.append("#wechat_redirect")
- return "".join(url_list)
- def fetch_access_token(self, code):
- """获取 access_token
- :param code: 授权完成跳转回来后 URL 中的 code 参数
- :return: JSON 数据包
- """
- res = self._get(
- "sns/oauth2/component/access_token",
- params={
- "appid": self.app_id,
- "component_appid": self.component.component_appid,
- "component_access_token": self.component.access_token,
- "code": code,
- "grant_type": "authorization_code",
- },
- )
- self.access_token = res["access_token"]
- self.open_id = res["openid"]
- self.refresh_token = res["refresh_token"]
- self.expires_in = res["expires_in"]
- self.scope = res["scope"]
- return res
- def refresh_access_token(self, refresh_token):
- """刷新 access token
- :param refresh_token: OAuth2 refresh token
- :return: JSON 数据包
- """
- res = self._get(
- "sns/oauth2/component/refresh_token",
- params={
- "appid": self.app_id,
- "grant_type": "refresh_token",
- "refresh_token": refresh_token,
- "component_appid": self.component.component_appid,
- "component_access_token": self.component.access_token,
- },
- )
- self.access_token = res["access_token"]
- self.open_id = res["openid"]
- self.refresh_token = res["refresh_token"]
- self.expires_in = res["expires_in"]
- self.scope = res["scope"]
- return res
- def get_user_info(self, openid=None, access_token=None, lang="zh_CN"):
- """获取用户基本信息(需授权作用域为snsapi_userinfo)
- 如果网页授权作用域为snsapi_userinfo,则此时开发者可以通过access_token和openid拉取用户信息了。
- :param openid: 可选,微信 openid,默认获取当前授权用户信息
- :param access_token: 可选,access_token,默认使用当前授权用户的 access_token
- :param lang: 可选,语言偏好, 默认为 ``zh_CN``
- :return: JSON 数据包
- """
- openid = openid or self.open_id
- access_token = access_token or self.access_token
- return self._get(
- "sns/userinfo",
- params={"access_token": access_token, "openid": openid, "lang": lang},
- )
- def _request(self, method, url_or_endpoint, **kwargs):
- if not url_or_endpoint.startswith(("http://", "https://")):
- url = "{}{}".format(self.API_BASE_URL, url_or_endpoint)
- else:
- url = url_or_endpoint
- if isinstance(kwargs.get("data", ""), dict):
- body = json.dumps(kwargs["data"], ensure_ascii=False)
- body = body.encode("utf-8")
- kwargs["data"] = body
- res = self._http.request(method=method, url=url, **kwargs)
- try:
- res.raise_for_status()
- except requests.RequestException as reqe:
- raise WeChatOAuthException(
- errCode=None,
- errMsg=None,
- client=self,
- request=reqe.request,
- response=reqe.response,
- )
- return self._handle_result(res, method=method, url=url, **kwargs)
- def _handle_result(self, res, method=None, url=None, **kwargs):
- result = json.loads(res.content.decode("utf-8", "ignore"), strict=False)
- if "errcode" in result:
- result["errcode"] = int(result["errcode"])
- if "errcode" in result and result["errcode"] != 0:
- errcode = result["errcode"]
- errmsg = result.get("errmsg", errcode)
- if self.component.auto_retry and errcode in (
- WeChatErrorCode.INVALID_CREDENTIAL.value,
- WeChatErrorCode.INVALID_ACCESS_TOKEN.value,
- WeChatErrorCode.EXPIRED_ACCESS_TOKEN.value,
- ):
- logger.info("Component access token expired, fetch a new one and retry request")
- self.component.fetch_access_token()
- kwargs["params"]["component_access_token"] = self.component.access_token
- return self._request(method=method, url_or_endpoint=url, **kwargs)
- elif errcode == WeChatErrorCode.OUT_OF_API_FREQ_LIMIT.value:
- # api freq out of limit
- raise APILimitedException(errcode, errmsg, client=self, request=res.request, response=res)
- else:
- raise WeChatComponentOAuthException(errcode, errmsg, client=self, request=res.request, response=res)
- return result
- def _get(self, url, **kwargs):
- return self._request(method="get", url_or_endpoint=url, **kwargs)
|