# -*- coding: utf-8 -*- # !/usr/bin/env python from __future__ import absolute_import, unicode_literals import base64 import hashlib import inspect import logging from xml.parsers.expat import ExpatError import requests import six import xmltodict from Crypto.Cipher import AES from cryptography import x509 from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from optionaldict import optionaldict from library import random_string, to_binary, to_text from library.wechatbase.exceptions import InvalidSignatureException, WechatNetworkException, \ WeChatException from library.wechatpy.pay import api from library.wechatpy.pay.base import BaseWeChatPayAPI from library.wechatpy.pay.utils import ( calculate_signature, calculate_signature_hmac, _check_signature, dict_to_xml ) logger = logging.getLogger(__name__) def _is_api_endpoint(obj): return isinstance(obj, BaseWeChatPayAPI) class WeChatPay(object): """ 微信支付接口 :param appid: 微信公众号 appid :param sub_appid: 当前调起支付的小程序APPID :param api_key: 商户 key,不要在这里使用小程序的密钥 :param mch_id: 商户号 :param sub_mch_id: 可选,子商户号,受理模式下必填 :param mch_cert: 必填,商户证书路径 :param mch_key: 必填,商户证书私钥路径 :param timeout: 可选,请求超时时间,单位秒,默认无超时设置 :param sandbox: 可选,是否使用测试环境,默认为 False """ redpack = api.WeChatRedpack() """红包接口""" transfer = api.WeChatTransfer() """企业付款接口""" coupon = api.WeChatCoupon() """代金券接口""" order = api.WeChatOrder() """订单接口""" refund = api.WeChatRefund() """退款接口""" micropay = api.WeChatMicroPay() """刷卡支付接口""" tools = api.WeChatTools() """工具类接口""" jsapi = api.WeChatJSAPI() """公众号网页 JS 支付接口""" withhold = api.WeChatWithhold() """代扣接口""" API_BASE_URL = 'https://api.mch.weixin.qq.com/' def __new__(cls, *args, **kwargs): self = super(WeChatPay, cls).__new__(cls) api_endpoints = inspect.getmembers(self, _is_api_endpoint) for name, _api in api_endpoints: api_cls = type(_api) _api = api_cls(self) setattr(self, name, _api) return self def __init__(self, appid, api_key, mch_id, sub_mch_id = None, mch_cert = None, mch_key = None, timeout = None, sandbox = False, sub_appid = None): # type: (str, str, str, str, str, str, str, bool, str) -> None self.appid = appid self.sub_appid = sub_appid self.api_key = api_key self.mch_id = mch_id self.sub_mch_id = sub_mch_id if mch_cert: self.mch_cert = x509.load_pem_x509_certificate(mch_cert) else: self.mch_cert = mch_cert if mch_key: self.mch_key = serialization.load_pem_private_key(mch_key, password = None, backend = default_backend()) else: self.mch_key = mch_key self.timeout = timeout self.sandbox = sandbox self._sandbox_api_key = None def __str__(self): _repr = '{kclass}(appid: {appid}, mchid: {mchid})'.format( kclass = self.__class__.__name__, appid = self.appid, mchid = self.mch_id) if six.PY2: return to_binary(_repr) else: return to_text(_repr) def __repr__(self): return str(self) def _fetch_sandbox_api_key(self): nonce_str = random_string(32) sign = calculate_signature({'mch_id': self.mch_id, 'nonce_str': nonce_str}, self.api_key) payload = dict_to_xml({ 'mch_id': self.mch_id, 'nonce_str': nonce_str, }, sign = sign) headers = {'Content-Type': 'text/xml'} api_url = '{base}sandboxnew/pay/getsignkey'.format(base = self.API_BASE_URL) with requests.sessions.Session() as session: response = session.post(api_url, data = payload, headers = headers) return xmltodict.parse(response.text)['xml'].get('sandbox_signkey') def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(('http://', 'https://')): api_base_url = kwargs.pop('api_base_url', self.API_BASE_URL) if self.sandbox: api_base_url = '{url}sandboxnew/'.format(url = api_base_url) url = '{base}{endpoint}'.format( base = api_base_url, endpoint = url_or_endpoint ) else: url = url_or_endpoint if isinstance(kwargs.get('data', ''), dict): data = kwargs['data'] if 'mchid' not in data: # Fuck Tencent data.setdefault('mch_id', self.mch_id) data.setdefault('sub_mch_id', self.sub_mch_id) data.setdefault('nonce_str', random_string(32)) data = optionaldict(data) if data.get('sign_type', 'MD5') == 'HMAC-SHA256': sign = calculate_signature_hmac(data, self.sandbox_api_key if self.sandbox else self.api_key) else: sign = calculate_signature(data, self.sandbox_api_key if self.sandbox else self.api_key) body = dict_to_xml(data, sign) body = body.encode('utf-8') kwargs['data'] = body # 商户证书 if self.mch_cert and self.mch_key: kwargs['cert'] = (self.mch_cert, self.mch_key) kwargs['timeout'] = kwargs.get('timeout', self.timeout) logger.debug('Request to WeChat API: %s %s\n%s', method, url, kwargs) with requests.sessions.Session() as session: res = session.request( method = method, url = url, **kwargs ) try: res.raise_for_status() except requests.RequestException as reqe: raise WechatNetworkException( errCode = 'HTTP({})'.format(res.status_code), errMsg = reqe.message, client = self, request = reqe.request, response = reqe.response ) return self._handle_result(res) def _handle_result(self, res): res.encoding = 'utf-8' xml = res.text logger.debug('Response from WeChat API \n %s', xml) try: data = xmltodict.parse(xml)['xml'] except (xmltodict.ParsingInterrupted, ExpatError): # 解析 XML 失败 logger.debug('WeChat payment result xml parsing error', exc_info = True) return xml return_code = data['return_code'] return_msg = data.get('return_msg') if return_code != 'SUCCESS': raise WechatNetworkException( errCode = return_code, errMsg = return_msg, client = self, request = res.request, response = res ) result_code = data.get('result_code') errcode = data.get('err_code') errmsg = data.get('err_code_des') if result_code != 'SUCCESS': raise WeChatException( errCode = errcode, errMsg = errmsg, client = self, request = res.request, response = res ) return data def get(self, url, **kwargs): return self._request( method = 'get', url_or_endpoint = url, **kwargs ) def check_signature(self, params): return _check_signature(params, self.api_key if not self.sandbox else self.sandbox_api_key) def post(self, url, **kwargs): return self._request( method = 'post', url_or_endpoint = url, **kwargs ) def parse_payment_result(self, xml): """解析微信支付结果通知""" try: data = xmltodict.parse(xml) except (xmltodict.ParsingInterrupted, ExpatError): raise InvalidSignatureException(client = self) if not data or 'xml' not in data: raise InvalidSignatureException(client = self) data = data['xml'] sign = data.pop('sign', None) real_sign = calculate_signature(data, self.api_key if not self.sandbox else self.sandbox_api_key) if sign != real_sign: raise InvalidSignatureException(client = self) for key in ('total_fee', 'settlement_total_fee', 'cash_fee', 'coupon_fee', 'coupon_count'): if key in data: data[key] = int(data[key]) data['sign'] = sign return data @property def sandbox_api_key(self): if self.sandbox and self._sandbox_api_key is None: self._sandbox_api_key = self._fetch_sandbox_api_key() return self._sandbox_api_key def decrypt(self, req_info): # type:(str) -> dict """""" apiKeyMd5 = hashlib.md5(self.api_key).hexdigest() enc = base64.b64decode(req_info) dec = AES.new(apiKeyMd5, AES.MODE_ECB).decrypt(enc) unpad = lambda s: s[:-ord(s[len(s) - 1:])] return xmltodict.parse(unpad(dec))["root"]