123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- from __future__ import absolute_import, unicode_literals
- import base64
- import hashlib
- import inspect
- import logging
- from xml.parsers.expat import ExpatError
- import requests
- import six
- import xmltodict
- from Crypto.Cipher import AES
- from cryptography import x509
- from cryptography.hazmat.backends import default_backend
- from cryptography.hazmat.primitives import serialization
- from optionaldict import optionaldict
- from library import random_string, to_binary, to_text
- from library.wechatbase.exceptions import InvalidSignatureException, WechatNetworkException, \
- WeChatException
- from library.wechatpy.pay import api
- from library.wechatpy.pay.base import BaseWeChatPayAPI
- from library.wechatpy.pay.utils import (
- calculate_signature, calculate_signature_hmac, _check_signature, dict_to_xml
- )
- logger = logging.getLogger(__name__)
- def _is_api_endpoint(obj):
- return isinstance(obj, BaseWeChatPayAPI)
- class WeChatPay(object):
- """
- 微信支付接口
- :param appid: 微信公众号 appid
- :param sub_appid: 当前调起支付的小程序APPID
- :param api_key: 商户 key,不要在这里使用小程序的密钥
- :param mch_id: 商户号
- :param sub_mch_id: 可选,子商户号,受理模式下必填
- :param mch_cert: 必填,商户证书路径
- :param mch_key: 必填,商户证书私钥路径
- :param timeout: 可选,请求超时时间,单位秒,默认无超时设置
- :param sandbox: 可选,是否使用测试环境,默认为 False
- """
- redpack = api.WeChatRedpack()
- """红包接口"""
- transfer = api.WeChatTransfer()
- """企业付款接口"""
- coupon = api.WeChatCoupon()
- """代金券接口"""
- order = api.WeChatOrder()
- """订单接口"""
- refund = api.WeChatRefund()
- """退款接口"""
- micropay = api.WeChatMicroPay()
- """刷卡支付接口"""
- tools = api.WeChatTools()
- """工具类接口"""
- jsapi = api.WeChatJSAPI()
- """公众号网页 JS 支付接口"""
- withhold = api.WeChatWithhold()
- """代扣接口"""
- API_BASE_URL = 'https://api.mch.weixin.qq.com/'
- def __new__(cls, *args, **kwargs):
- self = super(WeChatPay, cls).__new__(cls)
- api_endpoints = inspect.getmembers(self, _is_api_endpoint)
- for name, _api in api_endpoints:
- api_cls = type(_api)
- _api = api_cls(self)
- setattr(self, name, _api)
- return self
- def __init__(self, appid, api_key, mch_id, sub_mch_id = None,
- mch_cert = None, mch_key = None, timeout = None, sandbox = False, sub_appid = None):
- # type: (str, str, str, str, str, str, str, bool, str) -> None
- self.appid = appid
- self.sub_appid = sub_appid
- self.api_key = api_key
- self.mch_id = mch_id
- self.sub_mch_id = sub_mch_id
- if mch_cert:
- self.mch_cert = x509.load_pem_x509_certificate(mch_cert)
- else:
- self.mch_cert = mch_cert
- if mch_key:
- self.mch_key = serialization.load_pem_private_key(mch_key, password = None, backend = default_backend())
- else:
- self.mch_key = mch_key
- self.timeout = timeout
- self.sandbox = sandbox
- self._sandbox_api_key = None
- def __str__(self):
- _repr = '{kclass}(appid: {appid}, mchid: {mchid})'.format(
- kclass = self.__class__.__name__,
- appid = self.appid,
- mchid = self.mch_id)
- if six.PY2:
- return to_binary(_repr)
- else:
- return to_text(_repr)
- def __repr__(self):
- return str(self)
- def _fetch_sandbox_api_key(self):
- nonce_str = random_string(32)
- sign = calculate_signature({'mch_id': self.mch_id, 'nonce_str': nonce_str}, self.api_key)
- payload = dict_to_xml({
- 'mch_id': self.mch_id,
- 'nonce_str': nonce_str,
- }, sign = sign)
- headers = {'Content-Type': 'text/xml'}
- api_url = '{base}sandboxnew/pay/getsignkey'.format(base = self.API_BASE_URL)
- with requests.sessions.Session() as session:
- response = session.post(api_url, data = payload, headers = headers)
- return xmltodict.parse(response.text)['xml'].get('sandbox_signkey')
- def _request(self, method, url_or_endpoint, **kwargs):
- if not url_or_endpoint.startswith(('http://', 'https://')):
- api_base_url = kwargs.pop('api_base_url', self.API_BASE_URL)
- if self.sandbox:
- api_base_url = '{url}sandboxnew/'.format(url = api_base_url)
- url = '{base}{endpoint}'.format(
- base = api_base_url,
- endpoint = url_or_endpoint
- )
- else:
- url = url_or_endpoint
- if isinstance(kwargs.get('data', ''), dict):
- data = kwargs['data']
- if 'mchid' not in data:
- # Fuck Tencent
- data.setdefault('mch_id', self.mch_id)
- data.setdefault('sub_mch_id', self.sub_mch_id)
- data.setdefault('nonce_str', random_string(32))
- data = optionaldict(data)
- if data.get('sign_type', 'MD5') == 'HMAC-SHA256':
- sign = calculate_signature_hmac(data, self.sandbox_api_key if self.sandbox else self.api_key)
- else:
- sign = calculate_signature(data, self.sandbox_api_key if self.sandbox else self.api_key)
- body = dict_to_xml(data, sign)
- body = body.encode('utf-8')
- kwargs['data'] = body
- # 商户证书
- if self.mch_cert and self.mch_key:
- kwargs['cert'] = (self.mch_cert, self.mch_key)
- kwargs['timeout'] = kwargs.get('timeout', self.timeout)
- logger.debug('Request to WeChat API: %s %s\n%s', method, url, kwargs)
- with requests.sessions.Session() as session:
- res = session.request(
- method = method,
- url = url,
- **kwargs
- )
- try:
- res.raise_for_status()
- except requests.RequestException as reqe:
- raise WechatNetworkException(
- errCode = 'HTTP({})'.format(res.status_code),
- errMsg = reqe.message,
- client = self,
- request = reqe.request,
- response = reqe.response
- )
- return self._handle_result(res)
- def _handle_result(self, res):
- res.encoding = 'utf-8'
- xml = res.text
- logger.debug('Response from WeChat API \n %s', xml)
- try:
- data = xmltodict.parse(xml)['xml']
- except (xmltodict.ParsingInterrupted, ExpatError):
- # 解析 XML 失败
- logger.debug('WeChat payment result xml parsing error', exc_info = True)
- return xml
- return_code = data['return_code']
- return_msg = data.get('return_msg')
- if return_code != 'SUCCESS':
- raise WechatNetworkException(
- errCode = return_code,
- errMsg = return_msg,
- client = self,
- request = res.request,
- response = res
- )
- result_code = data.get('result_code')
- errcode = data.get('err_code')
- errmsg = data.get('err_code_des')
- if result_code != 'SUCCESS':
- raise WeChatException(
- errCode = errcode,
- errMsg = errmsg,
- client = self,
- request = res.request,
- response = res
- )
- return data
- def get(self, url, **kwargs):
- return self._request(
- method = 'get',
- url_or_endpoint = url,
- **kwargs
- )
- def check_signature(self, params):
- return _check_signature(params, self.api_key if not self.sandbox else self.sandbox_api_key)
- def post(self, url, **kwargs):
- return self._request(
- method = 'post',
- url_or_endpoint = url,
- **kwargs
- )
- def parse_payment_result(self, xml):
- """解析微信支付结果通知"""
- try:
- data = xmltodict.parse(xml)
- except (xmltodict.ParsingInterrupted, ExpatError):
- raise InvalidSignatureException(client = self)
- if not data or 'xml' not in data:
- raise InvalidSignatureException(client = self)
- data = data['xml']
- sign = data.pop('sign', None)
- real_sign = calculate_signature(data, self.api_key if not self.sandbox else self.sandbox_api_key)
- if sign != real_sign:
- raise InvalidSignatureException(client = self)
- for key in ('total_fee', 'settlement_total_fee', 'cash_fee', 'coupon_fee', 'coupon_count'):
- if key in data:
- data[key] = int(data[key])
- data['sign'] = sign
- return data
- @property
- def sandbox_api_key(self):
- if self.sandbox and self._sandbox_api_key is None:
- self._sandbox_api_key = self._fetch_sandbox_api_key()
- return self._sandbox_api_key
- def decrypt(self, req_info): # type:(str) -> dict
- """"""
- apiKeyMd5 = hashlib.md5(self.api_key).hexdigest()
- enc = base64.b64decode(req_info)
- dec = AES.new(apiKeyMd5, AES.MODE_ECB).decrypt(enc)
- unpad = lambda s: s[:-ord(s[len(s) - 1:])]
- return xmltodict.parse(unpad(dec))["root"]
|