#!/usr/bin/env python # coding: utf-8 """ __init__.py ~~~~~~~~~~ python-alipay-sdk """ import base64 import hashlib import datetime import logging import sys from functools import partial import json import OpenSSL import requests import six from Crypto.Signature import PKCS1_v1_5 from Crypto.Hash import SHA, SHA256 from Crypto.PublicKey import RSA from library import to_binary, to_text from .compat import quote_plus, decodebytes, encodebytes, b from .exceptions import AliPayGatewayException, AliException, AliValidationError, AliPayServiceException, \ AliPayNetworkException from .base import AliErrorCode # 常见加密算法 CryptoAlgSet = ( b'rsaEncryption', b'md2WithRSAEncryption', b'md5WithRSAEncryption', b'sha1WithRSAEncryption', b'sha256WithRSAEncryption', b'sha384WithRSAEncryption', b'sha512WithRSAEncryption' ) PY3 = sys.version_info[0] == 3 from Crypto.Cipher import AES BLOCK_SIZE = AES.block_size pad = lambda s, length: s + (BLOCK_SIZE - length % BLOCK_SIZE) * chr(BLOCK_SIZE - length % BLOCK_SIZE) unpad = lambda s: s[:-ord(s[len(s) - 1:])] logger = logging.getLogger(__name__) def encrypt_content(content, encrypt_type, encrypt_key, charset): if "AES" == encrypt_type.upper(): return aes_encrypt_content(content, encrypt_key, charset) raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type) def aes_encrypt_content(content, encrypt_key, charset): length = None if PY3: length = len(bytes(content, encoding = charset)) else: length = len(bytes(content)) padded_content = pad(content, length) iv = '\0' * BLOCK_SIZE cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv) encrypted_content = cryptor.encrypt(padded_content) encrypted_content = base64.b64encode(encrypted_content) if PY3: encrypted_content = str(encrypted_content, encoding = charset) return encrypted_content def decrypt_content(encrypted_content, encrypt_type, encrypt_key, charset): if "AES" == encrypt_type.upper(): return aes_decrypt_content(encrypted_content, encrypt_key, charset) raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type) def aes_decrypt_content(encrypted_content, encrypt_key, charset): encrypted_content = base64.b64decode(encrypted_content) iv = '\0' * BLOCK_SIZE cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv) content = unpad(cryptor.decrypt(encrypted_content)) if PY3: content = content.decode(charset) return content class BaseAliPay(object): def __str__(self): _repr = '{kclass}(appid: {appid})'.format(kclass = self.__class__.__name__, appid = self.appid) if six.PY2: return to_binary(_repr) else: return to_text(_repr) def __repr__(self): return str(self) @property def appid(self): return self._appid @property def sign_type(self): return self._sign_type @property def app_auth_token(self): return None def __init__(self, appid, app_private_key_string, public_key_string, aes_encrypt_key = None, sign_type = "RSA2", debug = False, timeout = 15): """ 初始化: alipay = AliPay( appid="", app_notify_url="", app_private_key_path="", alipay_public_key_path="", sign_type="RSA2" ) """ self._appid = appid self.app_private_key = RSA.importKey(app_private_key_string) self.alipay_public_key = RSA.importKey(public_key_string) self.aes_encrypt_key = aes_encrypt_key if sign_type not in ("RSA", "RSA2"): raise AliException(errCode = AliErrorCode.MY_INVALID_PARAMETER, errMsg = 'Unsupported sign type {}'.format(sign_type), client = self) self._sign_type = sign_type if debug is True: self._gateway = "https://openapi.alipaydev.com/gateway.do" else: self._gateway = "https://openapi.alipay.com/gateway.do" self.timeout = timeout def _sign(self, raw_string): """ 通过如下方法调试签名 方法1 key = rsa.PrivateKey.load_pkcs1(open(self._app_private_key_path).read()) sign = rsa.sign(unsigned_string.encode("utf8"), key, "SHA-1") # base64 编码,转换为unicode表示并移除回车 sign = base64.encodebytes(sign).decode("utf8").replace("\n", "") 方法2 key = RSA.importKey(open(self._app_private_key_path).read()) signer = PKCS1_v1_5.new(key) signature = signer.sign(SHA.new(unsigned_string.encode("utf8"))) # base64 编码,转换为unicode表示并移除回车 sign = base64.encodebytes(signature).decode("utf8").replace("\n", "") 方法3 echo "abc" | openssl sha1 -sign alipay.key | openssl base64 """ key = self.app_private_key signer = PKCS1_v1_5.new(key) if self._sign_type == "RSA": signature = signer.sign(SHA.new(b(raw_string))) else: signature = signer.sign(SHA256.new(b(raw_string))) # base64 编码,转换为unicode表示并移除回车 sign = encodebytes(signature).decode("utf-8").replace("\n", "") return sign def _ordered_data(self, data): for k, v in data.items(): if isinstance(v, dict): data[k] = json.dumps(v, separators = (',', ':')) return sorted(data.items()) def build_body(self, method, append_auth_token = False, notify_url = None, **query): data = { "app_id": self._appid, "method": method, "charset": "utf-8", "sign_type": self._sign_type, "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "version": "1.0" } if append_auth_token: data["app_auth_token"] = self.app_auth_token if notify_url: data['notify_url'] = notify_url data.update(query) if self.aes_encrypt_key and 'biz_content' in data: biz_content = data.pop('biz_content') data['biz_content'] = encrypt_content( content = json.dumps(biz_content, ensure_ascii = False, sort_keys = True, separators = (',', ':')), encrypt_type = 'AES', encrypt_key = self.aes_encrypt_key, charset = 'utf-8') return data def _verify(self, raw_content, signature): # 开始计算签名 key = self.alipay_public_key signer = PKCS1_v1_5.new(key) if self._sign_type == "RSA": digest = SHA.new() else: digest = SHA256.new() digest.update(raw_content.encode("utf-8")) signature = decodebytes(signature.encode("utf-8")) if signer.verify(digest, signature): return True return False def sign_data(self, data): data.pop("sign", None) ordered_items = self._ordered_data(data) raw_string = "&".join("{}={}".format(k, v) for k, v in ordered_items) sign = self._sign(raw_string) unquoted_items = ordered_items + [('sign', sign)] return "&".join("{}={}".format(k, quote_plus(v)) for k, v in unquoted_items) def verify(self, data, signature, pop_sign_type = True): if "sign_type" in data: if pop_sign_type: sign_type = data.pop("sign_type") else: sign_type = data.get("sign_type") if sign_type != self._sign_type: raise AliValidationError( errmsg = 'invalid sign type', lvalue = self._sign_type, rvalue = sign_type, client = self) # 排序后的字符串 unsigned_items = self._ordered_data(data) message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items) return self._verify(message, signature) def api(self, api_name, **kwargs): """ alipay.api("alipay.trade.page.pay", **kwargs) ==> alipay.api_alipay_trade_page_pay(**kwargs) """ api_name = api_name.replace(".", "_") key = "api_" + api_name if hasattr(self, key): return getattr(self, key) raise AttributeError("Unknown attribute" + api_name) def _request(self, method, url, headers = None, **kwargs): logger.debug("Calculate Signature URL: %s", url) if headers: headers.update({'Content-Type': 'application/json'}) else: headers = {'Content-Type': 'application/json'} kwargs['timeout'] = kwargs.get('timeout', 15) logger.debug('Request to Alipay API: %s %s\n%s', method, url, kwargs) with requests.sessions.Session() as session: res = session.request( url = url, method = method, headers = headers, **kwargs) try: res.raise_for_status() return res.text.decode('utf-8') except requests.RequestException as reqe: raise AliPayNetworkException( errCode = res.status_code, errMsg = reqe.message, client = self, request = reqe.request, response = reqe.response ) def api_alipay_trade_wap_pay( self, subject, out_trade_no, total_amount, return_url = None, notify_url = None, **kwargs ): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "QUICK_WAP_PAY" } biz_content.update(kwargs) query = { 'biz_content': biz_content } if return_url: query.update({'return_url': return_url}) if notify_url: query.update({'notify_url': notify_url}) data = self.build_body("alipay.trade.wap.pay", **query) return self.sign_data(data) def api_alipay_trade_app_pay( self, subject, out_trade_no, total_amount, notify_url = None, **kwargs ): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "QUICK_MSECURITY_PAY" } biz_content.update(kwargs) query = { 'biz_content': biz_content } if notify_url: query.update({'notify_url': notify_url}) data = self.build_body("alipay.trade.app.pay", **query) return self.sign_data(data) def api_alipay_trade_page_pay(self, subject, out_trade_no, total_amount, return_url = None, notify_url = None, **kwargs): biz_content = { "subject": subject, "out_trade_no": out_trade_no, "total_amount": total_amount, "product_code": "FAST_INSTANT_TRADE_PAY" } biz_content.update(kwargs) query = { 'biz_content': biz_content } if return_url: query.update({'return_url': return_url}) if notify_url: query.update({'notify_url': notify_url}) data = self.build_body("alipay.trade.page.pay", **query) return self.sign_data(data) def api_alipay_trade_query(self, out_trade_no = None, trade_no = None): """ response = { "alipay_trade_query_response": { "trade_no": "2017032121001004070200176844", "code": "10000", "invoice_amount": "20.00", "open_id": "20880072506750308812798160715407", "fund_bill_list": [ { "amount": "20.00", "fund_channel": "ALIPAYACCOUNT" } ], "buyer_logon_id": "csq***@sandbox.com", "send_pay_date": "2017-03-21 13:29:17", "receipt_amount": "20.00", "out_trade_no": "out_trade_no15", "buyer_pay_amount": "20.00", "buyer_user_id": "2088102169481075", "msg": "Success", "point_amount": "0.00", "trade_status": "TRADE_SUCCESS", "total_amount": "20.00" }, "sign": "" } """ assert (out_trade_no is not None) or (trade_no is not None), \ "Both trade_no and out_trade_no are None" biz_content = {} if out_trade_no: biz_content["out_trade_no"] = out_trade_no if trade_no: biz_content["trade_no"] = trade_no data = self.build_body("alipay.trade.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response(raw_string, "alipay_trade_query_response") def api_alipay_trade_pay( self, out_trade_no, scene, auth_code, subject, notify_url = None, **kwargs ): """ eg: self.api_alipay_trade_pay( out_trade_no, "bar_code/wave_code", auth_code, subject, total_amount=12, discountable_amount=10 ) failed response = { "alipay_trade_pay_response": { "code": "40004", "msg": "Business Failed", "sub_code": "ACQ.INVALID_PARAMETER", "sub_msg": "", "buyer_pay_amount": "0.00", "invoice_amount": "0.00", "point_amount": "0.00", "receipt_amount": "0.00" }, "sign": "" } succeeded response = { "alipay_trade_pay_response": { "trade_no": "2017032121001004070200176846", "code": "10000", "invoice_amount": "20.00", "open_id": "20880072506750308812798160715407", "fund_bill_list": [ { "amount": "20.00", "fund_channel": "ALIPAYACCOUNT" } ], "buyer_logon_id": "csq***@sandbox.com", "receipt_amount": "20.00", "out_trade_no": "out_trade_no18", "buyer_pay_amount": "20.00", "buyer_user_id": "2088102169481075", "msg": "Success", "point_amount": "0.00", "gmt_payment": "2017-03-21 15:07:29", "total_amount": "20.00" }, "sign": "" } """ assert scene in ("bar_code", "wave_code"), 'scene not in ("bar_code", "wave_code")' biz_content = { "out_trade_no": out_trade_no, "scene": scene, "auth_code": auth_code, "subject": subject } biz_content.update(**kwargs) query = { 'biz_content': biz_content } if notify_url: query.update({ 'notify_url': notify_url }) data = self.build_body("alipay.trade.pay", **query) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response(raw_string, "alipay_trade_pay_response") def api_alipay_trade_refund(self, out_trade_no, out_request_no, refund_amount, refund_reason): biz_content = { "out_trade_no": out_trade_no, "refund_amount": refund_amount, "out_request_no": out_request_no, 'refund_reason': refund_reason } data = self.build_body("alipay.trade.refund", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response(raw_string, "alipay_trade_refund_response") def api_alipay_trade_cancel(self, out_trade_no = None, trade_no = None): """ response = { "alipay_trade_cancel_response": { "msg": "Success", "out_trade_no": "out_trade_no15", "code": "10000", "retry_flag": "N" } } """ assert (out_trade_no is not None) or (trade_no is not None), \ "Both trade_no and out_trade_no are None" biz_content = {} if out_trade_no: biz_content["out_trade_no"] = out_trade_no if trade_no: biz_content["trade_no"] = trade_no data = self.build_body("alipay.trade.cancel", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response(raw_string, "alipay_trade_cancel_response") def api_alipay_trade_precreate(self, out_trade_no, total_amount, subject, **kwargs): """ success response = { "alipay_trade_precreate_response": { "msg": "Success", "out_trade_no": "out_trade_no17", "code": "10000", "qr_code": "https://qr.alipay.com/bax03431ljhokirwl38f00a7" }, "sign": "" } failed response = { "alipay_trade_precreate_response": { "msg": "Business Failed", "sub_code": "ACQ.TOTAL_FEE_EXCEED", "code": "40004", "sub_msg": "订单金额超过限额" }, "sign": "" } """ biz_content = { "out_trade_no": out_trade_no, "total_amount": total_amount, "subject": subject } biz_content.update(**kwargs) data = self.build_body("alipay.trade.precreate", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response(raw_string, "alipay_trade_precreate_response") def api_alipay_fund_trans_toaccount_transfer( self, out_biz_no, payee_type, payee_account, amount, **kwargs ): assert payee_type in ("ALIPAY_USERID", "ALIPAY_LOGONID"), "unknown payee type" biz_content = { "out_biz_no": out_biz_no, "payee_type": payee_type, "payee_account": payee_account, "amount": amount } biz_content.update(kwargs) data = self.build_body("alipay.fund.trans.toaccount.transfer", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_fund_trans_toaccount_transfer_response" ) def api_alipay_fund_trans_uni_transfer( self, out_biz_no, trans_amount, payee_info, order_title, product_code = 'TRANS_BANKCARD_NO_PWD', biz_scene = 'DIRECT_TRANSFER', **kwargs): biz_content = { "out_biz_no": out_biz_no, "trans_amount": trans_amount, "product_code": product_code, "biz_scene": biz_scene, "payee_info": payee_info, "order_title": order_title } biz_content.update(kwargs) data = self.build_body("alipay.fund.trans.uni.transfer", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_fund_trans_uni_transfer_response" ) def api_alipay_fund_trans_common_query(self, out_biz_no, product_code = 'TRANS_BANKCARD_NO_PWD', biz_scene = 'DIRECT_TRANSFER', **kwargs): biz_content = { "out_biz_no": out_biz_no, "product_code": product_code, "biz_scene": biz_scene } biz_content.update(kwargs) data = self.build_body("alipay.fund.trans.common.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_fund_trans_common_query_response" ) def api_alipay_trade_create(self, out_trade_no, total_amount, subject, notify_url = None, **kwargs): biz_content = { "out_trade_no": out_trade_no, "total_amount": total_amount, "subject": subject } biz_content.update(**kwargs) query = { 'biz_content': biz_content } if notify_url: query.update({ 'notify_url': notify_url }) data = self.build_body("alipay.trade.create", **query) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_trade_create_response" ) def api_alipay_fund_trans_order_query(self, out_biz_no = None, order_id = None): if out_biz_no is None and order_id is None: raise Exception("Both out_biz_no and order_id are None!") biz_content = {} if out_biz_no: biz_content["out_biz_no"] = out_biz_no if order_id: biz_content["order_id"] = order_id data = self.build_body("alipay.fund.trans.order.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_fund_trans_order_query_response" ) def api_alipay_system_oauth_token(self, auth_code, refresh_token = None): """ :param auth_code: :param refresh_token: :return: dict """ if refresh_token: query = { "grant_type": "refresh_token", "refresh_token": refresh_token } else: query = { "grant_type": "authorization_code", "code": auth_code } data = self.build_body("alipay.system.oauth.token", **query) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_system_oauth_token_response" ) def api_alipay_trade_refund_order_query(self, out_trade_no, out_request_no, query_options=None): # type:(str, str, dict) -> dict biz_content = { "out_trade_no": out_trade_no, "out_request_no": out_request_no, } if query_options: biz_content["query_options"] = query_options data = self.build_body("alipay.trade.fastpay.refund.query", biz_content=biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_trade_fastpay_refund_query_response" ) def query_auth(self, sub_merchant_id): biz_content = { "sub_merchant_id": sub_merchant_id } data = self.build_body("alipay.merchant.indirect.smidbind.query", biz_content=biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return def api_alipay_data_dataservice_bill_downloadurl_query(self, bill_type = 'trade', bill_date = None): """ :return: dict """ #: 注意,最多只能取昨天的数据,支付宝没有说明这一点 if bill_date is None: bill_date = (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime('%Y-%m-%d') biz_content = { 'bill_type': bill_type, 'bill_date': bill_date } data = self.build_body("alipay.data.dataservice.bill.downloadurl.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_data_dataservice_bill_downloadurl_query_response" ) def api_alipay_user_info_share(self, auth_token): """ :param auth_token: :return: dict """ data = self.build_body("alipay.user.info.share", auth_token = auth_token) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_user_info_share_response" ) def _verify_and_return_sync_response(self, raw_string, response_type): """ return data if verification succeeded, else raise exception """ logger.debug('Response from Alipay API: %s', raw_string) response = json.loads(raw_string) result = response[response_type] sign = response["sign"] # locate string to be signed raw_string = self._get_string_to_be_signed( raw_string, response_type ) if not self._verify(raw_string, sign): raise AliException( errCode = AliErrorCode.MY_ERROR_SIGNATURE, errMsg = u'签名错误', client = self) return result def _get_string_to_be_signed(self, raw_string, response_type): """ https://doc.open.alipay.com/docs/doc.htm?docType=1&articleId=106120 从同步返回的接口里面找到待签名的字符串 """ left_index = 0 right_index = 0 index = raw_string.find(response_type) left_index = raw_string.find("{", index) index = left_index + 1 balance = -1 while balance < 0 and index < len(raw_string) - 1: index_a = raw_string.find("{", index) index_b = raw_string.find("}", index) # 右括号没找到, 退出 if index_b == -1: break right_index = index_b # 左括号没找到,移动到右括号的位置 if index_a == -1: index = index_b + 1 balance += 1 # 左括号出现在有括号之前,移动到左括号的位置 elif index_a > index_b: balance += 1 index = index_b + 1 # 左括号出现在右括号之后, 移动到右括号的位置 else: balance -= 1 index = index_a + 1 return raw_string[left_index: right_index + 1] def upload_image(self, imageType, imageContent): """ 上传图片 """ path = "ant.merchant.expand.indirect.image.upload" query = { "image_type": imageType, "image_content": imageContent } data = self.build_body(path) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method="get", url=url, data=query) return self._verify_and_return_sync_response( raw_string, "ant_merchant_expand_indirect_image_upload_response" ) def api_alipay_user_agreement_page_sign( self, personal_product_code, sign_scene, access_params, product_code, external_agreement_no, third_party_type, external_logon_id, sign_validity_period = None, notify_url = None): biz_content = { "personal_product_code": personal_product_code, "sign_scene": sign_scene, "access_params": access_params, "product_code": product_code, "external_agreement_no": external_agreement_no, "third_party_type": third_party_type } if external_logon_id: biz_content.update({ "external_logon_id": external_logon_id }) if sign_validity_period: biz_content.update({ 'sign_validity_period': sign_validity_period }) data = self.build_body("alipay.user.agreement.page.sign", notify_url = notify_url, biz_content = biz_content) return self._gateway + "?" + self.sign_data(data) def api_alipay_user_agreement_query( self, personal_product_code, sign_scene, product_code, external_agreement_no): biz_content = { "personal_product_code": personal_product_code, "sign_scene": sign_scene, "product_code": product_code, "external_agreement_no": external_agreement_no } data = self.build_body("alipay.user.agreement.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = "get", url = url) return self._verify_and_return_sync_response( raw_string, "alipay_user_agreement_query_response" ) def api_alipay_user_agreement_unsign( self, personal_product_code, sign_scene, external_agreement_no): biz_content = { "personal_product_code": personal_product_code, "sign_scene": sign_scene, "external_agreement_no": external_agreement_no } data = self.build_body("alipay.user.agreement.unsign", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = "get", url = url) return self._verify_and_return_sync_response( raw_string, "alipay_user_agreement_unsign_response" ) def api_alipay_fund_accountbook_create(self, merchant_user_id, merchant_user_type, scene_code, ext_info): biz_content = { "merchant_user_id": merchant_user_id, "merchant_user_type": merchant_user_type, "scene_code": scene_code, "ext_info": ext_info } data = self.build_body("alipay.fund.accountbook.create", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = "get", url = url) return self._verify_and_return_sync_response( raw_string, "alipay_fund_accountbook_create_response" ) def api_alipay_fund_accountbook_query(self, account_book_id, scene_code, ext_info): biz_content = { "account_book_id": account_book_id, "scene_code": scene_code, "ext_info": ext_info } data = self.build_body("alipay.fund.accountbook.query", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = "get", url = url) return self._verify_and_return_sync_response( raw_string, "alipay_fund_accountbook_query_response" ) def api_alipay_fund_trans_page_pay(self, agreement_no, account_book_id, out_biz_no, trans_amount, time_expire, remark = u'转账', order_title = u'代发专项充值'): biz_content = { 'out_biz_no': out_biz_no, 'trans_amount': trans_amount, 'product_code': 'FUND_ACCOUNT_BOOK', 'biz_scene': 'SATF_DEPOSIT', 'remark': remark, 'order_title': order_title, 'time_expire':time_expire, 'payee_info': { 'identity_type': 'ACCOUNT_BOOK_ID', 'identity': account_book_id, 'ext_info': json.dumps({ 'agreement_no':agreement_no }) } } data = self.build_body("alipay.fund.trans.page.pay", biz_content = biz_content) url = self._gateway + "?" + self.sign_data(data) return url # raw_string = self._request(method = "get", url = url) # return raw_string # return self._verify_and_return_sync_response( # raw_string, "alipay_fund_accountbook_query_response" # ) class AliPay(BaseAliPay): def __init__(self, appid, app_private_key_string, public_key_string, aes_encrypt_key = None, sign_type = "RSA2", debug = False, timeout = 15): super(AliPay, self).__init__( appid = appid, app_private_key_string = app_private_key_string, public_key_string = public_key_string, aes_encrypt_key = aes_encrypt_key, sign_type = sign_type, debug = debug, timeout = timeout) class DCAliPay(AliPay): """ 数字证书 (digital certificate) 版本 """ def __init__( self, appid, app_public_key_cert_string, public_key_cert_string, root_cert_string, app_private_key_string = None, aes_encrypt_key = None, sign_type = "RSA2", debug = False, timeout = 15): """ 初始化 DCAlipay( appid='', app_private_key_string='', app_public_key_cert_string='', alipay_public_key_cert_sring='', aplipay_root_cert_string='', ) """ self._app_public_key_cert_string = app_public_key_cert_string self._alipay_public_key_cert_string = public_key_cert_string self._alipay_root_cert_string = root_cert_string public_key_string = self.load_alipay_public_key_string() super(DCAliPay, self).__init__( appid = appid, app_private_key_string = app_private_key_string, public_key_string = public_key_string, aes_encrypt_key = aes_encrypt_key, sign_type = sign_type, debug = debug, timeout = timeout) def api_alipay_open_app_alipaycert_download(self, alipay_cert_sn): """ 下载支付宝证书 验签使用,支付宝公钥证书无感知升级机制 """ biz_content = { "alipay_cert_sn": alipay_cert_sn } data = self.build_body("alipay.open.app.alipaycert.download", biz_content = biz_content) return self.sign_data(data) def build_body(self, method, append_auth_token = False, **query): data = super(DCAliPay, self).build_body(method, append_auth_token, **query) data["app_cert_sn"] = self.app_cert_sn data["alipay_root_cert_sn"] = self.alipay_root_cert_sn return data def load_alipay_public_key_string(self): cert = OpenSSL.crypto.load_certificate( OpenSSL.crypto.FILETYPE_PEM, self._alipay_public_key_cert_string ) return OpenSSL.crypto.dump_publickey( OpenSSL.crypto.FILETYPE_PEM, cert.get_pubkey() ).decode("utf-8") @staticmethod def get_cert_sn(cert): """ 获取证书 SN 算法 """ cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert) certIssue = cert.get_issuer() name = 'CN={},OU={},O={},C={}'.format(certIssue.CN, certIssue.OU, certIssue.O, certIssue.C) string = name + str(cert.get_serial_number()) return hashlib.md5(string.encode()).hexdigest() @staticmethod def read_pem_cert_chain(certContent): """解析根证书""" # 根证书中,每个 cert 中间有两个回车间隔 items = [i for i in certContent.split('\n\n') if i] load_cert = partial(OpenSSL.crypto.load_certificate, OpenSSL.crypto.FILETYPE_PEM) return [load_cert(c) for c in items] @staticmethod def get_root_cert_sn(rootCert): """ 根证书 SN 算法""" certs = DCAliPay.read_pem_cert_chain(rootCert) rootCertSN = None for cert in certs: try: sigAlg = cert.get_signature_algorithm() except ValueError: continue if sigAlg in CryptoAlgSet: certIssue = cert.get_issuer() name = 'CN={},OU={},O={},C={}'.format( certIssue.CN, certIssue.OU, certIssue.O, certIssue.C ) string = name + str(cert.get_serial_number()) certSN = hashlib.md5(string.encode()).hexdigest() if not rootCertSN: rootCertSN = certSN else: rootCertSN = rootCertSN + '_' + certSN return rootCertSN @property def app_cert_sn(self): if not hasattr(self, "_app_cert_sn"): self._app_cert_sn = self.get_cert_sn(self._app_public_key_cert_string) return getattr(self, "_app_cert_sn") @property def alipay_root_cert_sn(self): if not hasattr(self, "_alipay_root_cert_sn"): self._alipay_root_cert_sn = self.get_root_cert_sn(self._alipay_root_cert_string) return getattr(self, "_alipay_root_cert_sn") class ISVAliPay(BaseAliPay): def __init__(self, appid, app_private_key_string, public_key_string, sign_type = "RSA2", app_auth_token = None, app_auth_code = None, debug = False): if not app_auth_token and not app_auth_code: raise Exception("Both app_auth_code and app_auth_token are None !!!") self._app_auth_token = app_auth_token self._app_auth_code = app_auth_code super(ISVAliPay, self).__init__(appid, app_private_key_string, public_key_string, sign_type, debug) @property def app_auth_token(self): # 没有则换取token if not self._app_auth_token: result = self.api_alipay_open_auth_token_app(self._app_auth_code) self._app_auth_token = result.get("app_auth_token", None) if not self._app_auth_token: raise Exception("Get auth token by auth code failed: {}".format(self._app_auth_code)) return self._app_auth_token def build_body(self, method, biz_content, return_url = None, append_auth_token = True): return super(ISVAliPay, self).build_body(method, biz_content, return_url, append_auth_token) def api_alipay_open_auth_token_app(self, refresh_token = None): """ response = { "code": "10000", "msg": "Success", "app_auth_token": "201708BB28623ce3d10f4f62875e9ef5cbeebX07", "app_refresh_token": "201708BB108a270d8bb6409890d16175a04a7X07", "auth_app_id": "appid", "expires_in": 31536000, "re_expires_in": 32140800, "user_id": "2088xxxxx } """ if refresh_token: biz_content = { "grant_type": "refresh_token", "refresh_token": refresh_token } else: biz_content = { "grant_type": "authorization_code", "code": self._app_auth_code } data = self.build_body( "alipay.open.auth.token.app", biz_content, append_auth_token = False ) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_open_auth_token_app_response" ) def api_alipay_open_auth_token_app_query(self): biz_content = { "app_auth_token": self.app_auth_token } data = self.build_body( "alipay.open.auth.token.app.query", biz_content, append_auth_token = False ) url = self._gateway + "?" + self.sign_data(data) raw_string = self._request(method = 'get', url = url, timeout = self.timeout) return self._verify_and_return_sync_response( raw_string, "alipay_open_auth_token_app_query_response" )