|
- #!/usr/bin/env python
- # coding: utf-8
- """
- __init__.py
- ~~~~~~~~~~
- python-alipay-sdk
- """
- import base64
- import hashlib
- import datetime
- import logging
- import sys
- from functools import partial
- import json
- import OpenSSL
- import requests
- import six
- from Crypto.Signature import PKCS1_v1_5
- from Crypto.Hash import SHA, SHA256
- from Crypto.PublicKey import RSA
- from library import to_binary, to_text
- from .compat import quote_plus, decodebytes, encodebytes, b
- from .exceptions import AliPayGatewayException, AliException, AliValidationError, AliPayServiceException, \
- AliPayNetworkException
- from .base import AliErrorCode
- # 常见加密算法
- CryptoAlgSet = (
- b'rsaEncryption',
- b'md2WithRSAEncryption',
- b'md5WithRSAEncryption',
- b'sha1WithRSAEncryption',
- b'sha256WithRSAEncryption',
- b'sha384WithRSAEncryption',
- b'sha512WithRSAEncryption'
- )
- PY3 = sys.version_info[0] == 3
- from Crypto.Cipher import AES
- BLOCK_SIZE = AES.block_size
- pad = lambda s, length: s + (BLOCK_SIZE - length % BLOCK_SIZE) * chr(BLOCK_SIZE - length % BLOCK_SIZE)
- unpad = lambda s: s[:-ord(s[len(s) - 1:])]
- logger = logging.getLogger(__name__)
- def encrypt_content(content, encrypt_type, encrypt_key, charset):
- if "AES" == encrypt_type.upper():
- return aes_encrypt_content(content, encrypt_key, charset)
- raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type)
- def aes_encrypt_content(content, encrypt_key, charset):
- length = None
- if PY3:
- length = len(bytes(content, encoding = charset))
- else:
- length = len(bytes(content))
- padded_content = pad(content, length)
- iv = '\0' * BLOCK_SIZE
- cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv)
- encrypted_content = cryptor.encrypt(padded_content)
- encrypted_content = base64.b64encode(encrypted_content)
- if PY3:
- encrypted_content = str(encrypted_content, encoding = charset)
- return encrypted_content
- def decrypt_content(encrypted_content, encrypt_type, encrypt_key, charset):
- if "AES" == encrypt_type.upper():
- return aes_decrypt_content(encrypted_content, encrypt_key, charset)
- raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type)
- def aes_decrypt_content(encrypted_content, encrypt_key, charset):
- encrypted_content = base64.b64decode(encrypted_content)
- iv = '\0' * BLOCK_SIZE
- cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv)
- content = unpad(cryptor.decrypt(encrypted_content))
- if PY3:
- content = content.decode(charset)
- return content
- class BaseAliPay(object):
- def __str__(self):
- _repr = '{kclass}(appid: {appid})'.format(kclass = self.__class__.__name__, appid = self.appid)
- if six.PY2:
- return to_binary(_repr)
- else:
- return to_text(_repr)
- def __repr__(self):
- return str(self)
- @property
- def appid(self):
- return self._appid
- @property
- def sign_type(self):
- return self._sign_type
- @property
- def app_auth_token(self):
- return None
- def __init__(self, appid,
- app_private_key_string,
- public_key_string,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- """
- 初始化:
- alipay = AliPay(
- appid="",
- app_notify_url="",
- app_private_key_path="",
- alipay_public_key_path="",
- sign_type="RSA2"
- )
- """
- self._appid = appid
- self.app_private_key = RSA.importKey(app_private_key_string)
- self.alipay_public_key = RSA.importKey(public_key_string)
- self.aes_encrypt_key = aes_encrypt_key
- if sign_type not in ("RSA", "RSA2"):
- raise AliException(errCode = AliErrorCode.MY_INVALID_PARAMETER,
- errMsg = 'Unsupported sign type {}'.format(sign_type),
- client = self)
- self._sign_type = sign_type
- if debug is True:
- self._gateway = "https://openapi.alipaydev.com/gateway.do"
- else:
- self._gateway = "https://openapi.alipay.com/gateway.do"
- self.timeout = timeout
- def _sign(self, raw_string):
- """
- 通过如下方法调试签名
- 方法1
- key = rsa.PrivateKey.load_pkcs1(open(self._app_private_key_path).read())
- sign = rsa.sign(unsigned_string.encode("utf8"), key, "SHA-1")
- # base64 编码,转换为unicode表示并移除回车
- sign = base64.encodebytes(sign).decode("utf8").replace("\n", "")
- 方法2
- key = RSA.importKey(open(self._app_private_key_path).read())
- signer = PKCS1_v1_5.new(key)
- signature = signer.sign(SHA.new(unsigned_string.encode("utf8")))
- # base64 编码,转换为unicode表示并移除回车
- sign = base64.encodebytes(signature).decode("utf8").replace("\n", "")
- 方法3
- echo "abc" | openssl sha1 -sign alipay.key | openssl base64
- """
- key = self.app_private_key
- signer = PKCS1_v1_5.new(key)
- if self._sign_type == "RSA":
- signature = signer.sign(SHA.new(b(raw_string)))
- else:
- signature = signer.sign(SHA256.new(b(raw_string)))
- # base64 编码,转换为unicode表示并移除回车
- sign = encodebytes(signature).decode("utf-8").replace("\n", "")
- return sign
- def _ordered_data(self, data):
- for k, v in data.items():
- if isinstance(v, dict):
- data[k] = json.dumps(v, separators = (',', ':'))
- return sorted(data.items())
- def build_body(self, method, append_auth_token = False, notify_url = None, **query):
- data = {
- "app_id": self._appid,
- "method": method,
- "charset": "utf-8",
- "sign_type": self._sign_type,
- "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "version": "1.0"
- }
- if append_auth_token:
- data["app_auth_token"] = self.app_auth_token
- if notify_url:
- data['notify_url'] = notify_url
- data.update(query)
- if self.aes_encrypt_key and 'biz_content' in data:
- biz_content = data.pop('biz_content')
- data['biz_content'] = encrypt_content(
- content = json.dumps(biz_content,
- ensure_ascii = False,
- sort_keys = True,
- separators = (',', ':')),
- encrypt_type = 'AES',
- encrypt_key = self.aes_encrypt_key,
- charset = 'utf-8')
- return data
- def _verify(self, raw_content, signature):
- # 开始计算签名
- key = self.alipay_public_key
- signer = PKCS1_v1_5.new(key)
- if self._sign_type == "RSA":
- digest = SHA.new()
- else:
- digest = SHA256.new()
- digest.update(raw_content.encode("utf-8"))
- signature = decodebytes(signature.encode("utf-8"))
- if signer.verify(digest, signature):
- return True
- return False
- def sign_data(self, data):
- data.pop("sign", None)
- ordered_items = self._ordered_data(data)
- raw_string = "&".join("{}={}".format(k, v) for k, v in ordered_items)
- sign = self._sign(raw_string)
- unquoted_items = ordered_items + [('sign', sign)]
- return "&".join("{}={}".format(k, quote_plus(v)) for k, v in unquoted_items)
- def verify(self, data, signature, pop_sign_type = True):
- if "sign_type" in data:
- if pop_sign_type:
- sign_type = data.pop("sign_type")
- else:
- sign_type = data.get("sign_type")
- if sign_type != self._sign_type:
- raise AliValidationError(
- errmsg = 'invalid sign type',
- lvalue = self._sign_type,
- rvalue = sign_type,
- client = self)
- # 排序后的字符串
- unsigned_items = self._ordered_data(data)
- message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items)
- return self._verify(message, signature)
- def api(self, api_name, **kwargs):
- """
- alipay.api("alipay.trade.page.pay", **kwargs) ==> alipay.api_alipay_trade_page_pay(**kwargs)
- """
- api_name = api_name.replace(".", "_")
- key = "api_" + api_name
- if hasattr(self, key):
- return getattr(self, key)
- raise AttributeError("Unknown attribute" + api_name)
- def _request(self, method, url, headers = None, **kwargs):
- logger.debug("Calculate Signature URL: %s", url)
- if headers:
- headers.update({'Content-Type': 'application/json'})
- else:
- headers = {'Content-Type': 'application/json'}
- kwargs['timeout'] = kwargs.get('timeout', 15)
- logger.debug('Request to Alipay API: %s %s\n%s', method, url, kwargs)
- with requests.sessions.Session() as session:
- res = session.request(
- url = url,
- method = method,
- headers = headers,
- **kwargs)
- try:
- res.raise_for_status()
- return res.text.decode('utf-8')
- except requests.RequestException as reqe:
- raise AliPayNetworkException(
- errCode = res.status_code,
- errMsg = reqe.message,
- client = self,
- request = reqe.request,
- response = reqe.response
- )
- def api_alipay_trade_wap_pay(
- self, subject, out_trade_no, total_amount,
- return_url = None, notify_url = None, **kwargs
- ):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "QUICK_WAP_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if return_url:
- query.update({'return_url': return_url})
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.wap.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_app_pay(
- self, subject, out_trade_no, total_amount, notify_url = None, **kwargs
- ):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "QUICK_MSECURITY_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.app.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_page_pay(self, subject, out_trade_no, total_amount,
- return_url = None, notify_url = None, **kwargs):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "FAST_INSTANT_TRADE_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if return_url:
- query.update({'return_url': return_url})
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.page.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_query(self, out_trade_no = None, trade_no = None):
- """
- response = {
- "alipay_trade_query_response": {
- "trade_no": "2017032121001004070200176844",
- "code": "10000",
- "invoice_amount": "20.00",
- "open_id": "20880072506750308812798160715407",
- "fund_bill_list": [
- {
- "amount": "20.00",
- "fund_channel": "ALIPAYACCOUNT"
- }
- ],
- "buyer_logon_id": "csq***@sandbox.com",
- "send_pay_date": "2017-03-21 13:29:17",
- "receipt_amount": "20.00",
- "out_trade_no": "out_trade_no15",
- "buyer_pay_amount": "20.00",
- "buyer_user_id": "2088102169481075",
- "msg": "Success",
- "point_amount": "0.00",
- "trade_status": "TRADE_SUCCESS",
- "total_amount": "20.00"
- },
- "sign": ""
- }
- """
- assert (out_trade_no is not None) or (trade_no is not None), \
- "Both trade_no and out_trade_no are None"
- biz_content = {}
- if out_trade_no:
- biz_content["out_trade_no"] = out_trade_no
- if trade_no:
- biz_content["trade_no"] = trade_no
- data = self.build_body("alipay.trade.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_query_response")
- def api_alipay_trade_pay(
- self, out_trade_no, scene, auth_code, subject, notify_url = None, **kwargs
- ):
- """
- eg:
- self.api_alipay_trade_pay(
- out_trade_no,
- "bar_code/wave_code",
- auth_code,
- subject,
- total_amount=12,
- discountable_amount=10
- )
- failed response = {
- "alipay_trade_pay_response": {
- "code": "40004",
- "msg": "Business Failed",
- "sub_code": "ACQ.INVALID_PARAMETER",
- "sub_msg": "",
- "buyer_pay_amount": "0.00",
- "invoice_amount": "0.00",
- "point_amount": "0.00",
- "receipt_amount": "0.00"
- },
- "sign": ""
- }
- succeeded response =
- {
- "alipay_trade_pay_response": {
- "trade_no": "2017032121001004070200176846",
- "code": "10000",
- "invoice_amount": "20.00",
- "open_id": "20880072506750308812798160715407",
- "fund_bill_list": [
- {
- "amount": "20.00",
- "fund_channel": "ALIPAYACCOUNT"
- }
- ],
- "buyer_logon_id": "csq***@sandbox.com",
- "receipt_amount": "20.00",
- "out_trade_no": "out_trade_no18",
- "buyer_pay_amount": "20.00",
- "buyer_user_id": "2088102169481075",
- "msg": "Success",
- "point_amount": "0.00",
- "gmt_payment": "2017-03-21 15:07:29",
- "total_amount": "20.00"
- },
- "sign": ""
- }
- """
- assert scene in ("bar_code", "wave_code"), 'scene not in ("bar_code", "wave_code")'
- biz_content = {
- "out_trade_no": out_trade_no,
- "scene": scene,
- "auth_code": auth_code,
- "subject": subject
- }
- biz_content.update(**kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({
- 'notify_url': notify_url
- })
- data = self.build_body("alipay.trade.pay", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_pay_response")
- def api_alipay_trade_refund(self, out_trade_no, out_request_no, refund_amount, refund_reason):
- biz_content = {
- "out_trade_no": out_trade_no,
- "refund_amount": refund_amount,
- "out_request_no": out_request_no,
- 'refund_reason': refund_reason
- }
- data = self.build_body("alipay.trade.refund", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_refund_response")
- def api_alipay_trade_cancel(self, out_trade_no = None, trade_no = None):
- """
- response = {
- "alipay_trade_cancel_response": {
- "msg": "Success",
- "out_trade_no": "out_trade_no15",
- "code": "10000",
- "retry_flag": "N"
- }
- }
- """
- assert (out_trade_no is not None) or (trade_no is not None), \
- "Both trade_no and out_trade_no are None"
- biz_content = {}
- if out_trade_no:
- biz_content["out_trade_no"] = out_trade_no
- if trade_no:
- biz_content["trade_no"] = trade_no
- data = self.build_body("alipay.trade.cancel", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_cancel_response")
- def api_alipay_trade_precreate(self, out_trade_no, total_amount, subject, **kwargs):
- """
- success response = {
- "alipay_trade_precreate_response": {
- "msg": "Success",
- "out_trade_no": "out_trade_no17",
- "code": "10000",
- "qr_code": "https://qr.alipay.com/bax03431ljhokirwl38f00a7"
- },
- "sign": ""
- }
- failed response = {
- "alipay_trade_precreate_response": {
- "msg": "Business Failed",
- "sub_code": "ACQ.TOTAL_FEE_EXCEED",
- "code": "40004",
- "sub_msg": "订单金额超过限额"
- },
- "sign": ""
- }
- """
- biz_content = {
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "subject": subject
- }
- biz_content.update(**kwargs)
- data = self.build_body("alipay.trade.precreate", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_precreate_response")
- def api_alipay_fund_trans_toaccount_transfer(
- self, out_biz_no, payee_type, payee_account, amount, **kwargs
- ):
- assert payee_type in ("ALIPAY_USERID", "ALIPAY_LOGONID"), "unknown payee type"
- biz_content = {
- "out_biz_no": out_biz_no,
- "payee_type": payee_type,
- "payee_account": payee_account,
- "amount": amount
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.toaccount.transfer", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_toaccount_transfer_response"
- )
- def api_alipay_fund_trans_uni_transfer(
- self, out_biz_no, trans_amount, payee_info, order_title, product_code = 'TRANS_BANKCARD_NO_PWD',
- biz_scene = 'DIRECT_TRANSFER', **kwargs):
- biz_content = {
- "out_biz_no": out_biz_no,
- "trans_amount": trans_amount,
- "product_code": product_code,
- "biz_scene": biz_scene,
- "payee_info": payee_info,
- "order_title": order_title
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.uni.transfer", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_uni_transfer_response"
- )
- def api_alipay_fund_trans_common_query(self, out_biz_no, product_code = 'TRANS_BANKCARD_NO_PWD',
- biz_scene = 'DIRECT_TRANSFER', **kwargs):
- biz_content = {
- "out_biz_no": out_biz_no,
- "product_code": product_code,
- "biz_scene": biz_scene
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.common.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_common_query_response"
- )
- def api_alipay_trade_create(self, out_trade_no, total_amount, subject, notify_url = None, **kwargs):
- biz_content = {
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "subject": subject
- }
- biz_content.update(**kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({
- 'notify_url': notify_url
- })
- data = self.build_body("alipay.trade.create", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_trade_create_response"
- )
- def api_alipay_fund_trans_order_query(self, out_biz_no = None, order_id = None):
- if out_biz_no is None and order_id is None:
- raise Exception("Both out_biz_no and order_id are None!")
- biz_content = {}
- if out_biz_no:
- biz_content["out_biz_no"] = out_biz_no
- if order_id:
- biz_content["order_id"] = order_id
- data = self.build_body("alipay.fund.trans.order.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_order_query_response"
- )
- def api_alipay_system_oauth_token(self, auth_code, refresh_token = None):
- """
- :param auth_code:
- :param refresh_token:
- :return: dict
- """
- if refresh_token:
- query = {
- "grant_type": "refresh_token",
- "refresh_token": refresh_token
- }
- else:
- query = {
- "grant_type": "authorization_code",
- "code": auth_code
- }
- data = self.build_body("alipay.system.oauth.token", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_system_oauth_token_response"
- )
- def api_alipay_trade_refund_order_query(self, out_trade_no, out_request_no, query_options=None):
- # type:(str, str, dict) -> dict
- biz_content = {
- "out_trade_no": out_trade_no,
- "out_request_no": out_request_no,
- }
- if query_options:
- biz_content["query_options"] = query_options
- data = self.build_body("alipay.trade.fastpay.refund.query", biz_content=biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_trade_fastpay_refund_query_response"
- )
- def query_auth(self, sub_merchant_id):
- biz_content = {
- "sub_merchant_id": sub_merchant_id
- }
- data = self.build_body("alipay.merchant.indirect.smidbind.query", biz_content=biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return
- def api_alipay_data_dataservice_bill_downloadurl_query(self, bill_type = 'trade', bill_date = None):
- """
- :return: dict
- """
- #: 注意,最多只能取昨天的数据,支付宝没有说明这一点
- if bill_date is None:
- bill_date = (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime('%Y-%m-%d')
- biz_content = {
- 'bill_type': bill_type,
- 'bill_date': bill_date
- }
- data = self.build_body("alipay.data.dataservice.bill.downloadurl.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_data_dataservice_bill_downloadurl_query_response"
- )
- def api_alipay_user_info_share(self, auth_token):
- """
- :param auth_token:
- :return: dict
- """
- data = self.build_body("alipay.user.info.share", auth_token = auth_token)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_user_info_share_response"
- )
- def _verify_and_return_sync_response(self, raw_string, response_type):
- """
- return data if verification succeeded, else raise exception
- """
- logger.debug('Response from Alipay API: %s', raw_string)
- response = json.loads(raw_string)
- result = response[response_type]
- sign = response["sign"]
- # locate string to be signed
- raw_string = self._get_string_to_be_signed(
- raw_string, response_type
- )
- if not self._verify(raw_string, sign):
- raise AliException(
- errCode = AliErrorCode.MY_ERROR_SIGNATURE,
- errMsg = u'签名错误',
- client = self)
- return result
- def _get_string_to_be_signed(self, raw_string, response_type):
- """
- https://doc.open.alipay.com/docs/doc.htm?docType=1&articleId=106120
- 从同步返回的接口里面找到待签名的字符串
- """
- left_index = 0
- right_index = 0
- index = raw_string.find(response_type)
- left_index = raw_string.find("{", index)
- index = left_index + 1
- balance = -1
- while balance < 0 and index < len(raw_string) - 1:
- index_a = raw_string.find("{", index)
- index_b = raw_string.find("}", index)
- # 右括号没找到, 退出
- if index_b == -1:
- break
- right_index = index_b
- # 左括号没找到,移动到右括号的位置
- if index_a == -1:
- index = index_b + 1
- balance += 1
- # 左括号出现在有括号之前,移动到左括号的位置
- elif index_a > index_b:
- balance += 1
- index = index_b + 1
- # 左括号出现在右括号之后, 移动到右括号的位置
- else:
- balance -= 1
- index = index_a + 1
- return raw_string[left_index: right_index + 1]
- def upload_image(self, imageType, imageContent):
- """
- 上传图片
- """
- path = "ant.merchant.expand.indirect.image.upload"
- query = {
- "image_type": imageType,
- "image_content": imageContent
- }
- data = self.build_body(path)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method="get", url=url, data=query)
- return self._verify_and_return_sync_response(
- raw_string, "ant_merchant_expand_indirect_image_upload_response"
- )
- def api_alipay_user_agreement_page_sign(
- self, personal_product_code, sign_scene, access_params,
- product_code, external_agreement_no, third_party_type, external_logon_id,
- sign_validity_period = None, notify_url = None):
- biz_content = {
- "personal_product_code": personal_product_code,
- "sign_scene": sign_scene,
- "access_params": access_params,
- "product_code": product_code,
- "external_agreement_no": external_agreement_no,
- "third_party_type": third_party_type
- }
- if external_logon_id:
- biz_content.update({
- "external_logon_id": external_logon_id
- })
- if sign_validity_period:
- biz_content.update({
- 'sign_validity_period': sign_validity_period
- })
- data = self.build_body("alipay.user.agreement.page.sign", notify_url = notify_url, biz_content = biz_content)
- return self._gateway + "?" + self.sign_data(data)
- def api_alipay_user_agreement_query(
- self, personal_product_code, sign_scene, product_code, external_agreement_no):
- biz_content = {
- "personal_product_code": personal_product_code,
- "sign_scene": sign_scene,
- "product_code": product_code,
- "external_agreement_no": external_agreement_no
- }
- data = self.build_body("alipay.user.agreement.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = "get", url = url)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_user_agreement_query_response"
- )
- def api_alipay_user_agreement_unsign(
- self, personal_product_code, sign_scene, external_agreement_no):
- biz_content = {
- "personal_product_code": personal_product_code,
- "sign_scene": sign_scene,
- "external_agreement_no": external_agreement_no
- }
- data = self.build_body("alipay.user.agreement.unsign", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = "get", url = url)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_user_agreement_unsign_response"
- )
- def api_alipay_fund_accountbook_create(self, merchant_user_id, merchant_user_type, scene_code, ext_info):
- biz_content = {
- "merchant_user_id": merchant_user_id,
- "merchant_user_type": merchant_user_type,
- "scene_code": scene_code,
- "ext_info": ext_info
- }
- data = self.build_body("alipay.fund.accountbook.create", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = "get", url = url)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_accountbook_create_response"
- )
- def api_alipay_fund_accountbook_query(self, account_book_id, scene_code, ext_info):
- biz_content = {
- "account_book_id": account_book_id,
- "scene_code": scene_code,
- "ext_info": ext_info
- }
- data = self.build_body("alipay.fund.accountbook.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = "get", url = url)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_accountbook_query_response"
- )
- def api_alipay_fund_trans_page_pay(self, agreement_no, account_book_id, out_biz_no, trans_amount, time_expire, remark = u'转账', order_title = u'代发专项充值'):
- biz_content = {
- 'out_biz_no': out_biz_no,
- 'trans_amount': trans_amount,
- 'product_code': 'FUND_ACCOUNT_BOOK',
- 'biz_scene': 'SATF_DEPOSIT',
- 'remark': remark,
- 'order_title': order_title,
- 'time_expire':time_expire,
- 'payee_info': {
- 'identity_type': 'ACCOUNT_BOOK_ID',
- 'identity': account_book_id,
- 'ext_info': json.dumps({
- 'agreement_no':agreement_no
- })
- }
- }
- data = self.build_body("alipay.fund.trans.page.pay", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- return url
- # raw_string = self._request(method = "get", url = url)
- # return raw_string
- # return self._verify_and_return_sync_response(
- # raw_string, "alipay_fund_accountbook_query_response"
- # )
- class AliPay(BaseAliPay):
- def __init__(self, appid,
- app_private_key_string,
- public_key_string,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- super(AliPay, self).__init__(
- appid = appid,
- app_private_key_string = app_private_key_string,
- public_key_string = public_key_string,
- aes_encrypt_key = aes_encrypt_key,
- sign_type = sign_type,
- debug = debug,
- timeout = timeout)
- class DCAliPay(AliPay):
- """
- 数字证书 (digital certificate) 版本
- """
- def __init__(
- self,
- appid,
- app_public_key_cert_string,
- public_key_cert_string,
- root_cert_string,
- app_private_key_string = None,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- """
- 初始化
- DCAlipay(
- appid='',
- app_private_key_string='',
- app_public_key_cert_string='',
- alipay_public_key_cert_sring='',
- aplipay_root_cert_string='',
- )
- """
- self._app_public_key_cert_string = app_public_key_cert_string
- self._alipay_public_key_cert_string = public_key_cert_string
- self._alipay_root_cert_string = root_cert_string
- public_key_string = self.load_alipay_public_key_string()
- super(DCAliPay, self).__init__(
- appid = appid,
- app_private_key_string = app_private_key_string,
- public_key_string = public_key_string,
- aes_encrypt_key = aes_encrypt_key,
- sign_type = sign_type,
- debug = debug,
- timeout = timeout)
- def api_alipay_open_app_alipaycert_download(self, alipay_cert_sn):
- """
- 下载支付宝证书
- 验签使用,支付宝公钥证书无感知升级机制
- """
- biz_content = {
- "alipay_cert_sn": alipay_cert_sn
- }
- data = self.build_body("alipay.open.app.alipaycert.download", biz_content = biz_content)
- return self.sign_data(data)
- def build_body(self, method, append_auth_token = False, **query):
- data = super(DCAliPay, self).build_body(method, append_auth_token, **query)
- data["app_cert_sn"] = self.app_cert_sn
- data["alipay_root_cert_sn"] = self.alipay_root_cert_sn
- return data
- def load_alipay_public_key_string(self):
- cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, self._alipay_public_key_cert_string
- )
- return OpenSSL.crypto.dump_publickey(
- OpenSSL.crypto.FILETYPE_PEM, cert.get_pubkey()
- ).decode("utf-8")
- @staticmethod
- def get_cert_sn(cert):
- """
- 获取证书 SN 算法
- """
- cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
- certIssue = cert.get_issuer()
- name = 'CN={},OU={},O={},C={}'.format(certIssue.CN, certIssue.OU, certIssue.O, certIssue.C)
- string = name + str(cert.get_serial_number())
- return hashlib.md5(string.encode()).hexdigest()
- @staticmethod
- def read_pem_cert_chain(certContent):
- """解析根证书"""
- # 根证书中,每个 cert 中间有两个回车间隔
- items = [i for i in certContent.split('\n\n') if i]
- load_cert = partial(OpenSSL.crypto.load_certificate, OpenSSL.crypto.FILETYPE_PEM)
- return [load_cert(c) for c in items]
- @staticmethod
- def get_root_cert_sn(rootCert):
- """ 根证书 SN 算法"""
- certs = DCAliPay.read_pem_cert_chain(rootCert)
- rootCertSN = None
- for cert in certs:
- try:
- sigAlg = cert.get_signature_algorithm()
- except ValueError:
- continue
- if sigAlg in CryptoAlgSet:
- certIssue = cert.get_issuer()
- name = 'CN={},OU={},O={},C={}'.format(
- certIssue.CN, certIssue.OU, certIssue.O, certIssue.C
- )
- string = name + str(cert.get_serial_number())
- certSN = hashlib.md5(string.encode()).hexdigest()
- if not rootCertSN:
- rootCertSN = certSN
- else:
- rootCertSN = rootCertSN + '_' + certSN
- return rootCertSN
- @property
- def app_cert_sn(self):
- if not hasattr(self, "_app_cert_sn"):
- self._app_cert_sn = self.get_cert_sn(self._app_public_key_cert_string)
- return getattr(self, "_app_cert_sn")
- @property
- def alipay_root_cert_sn(self):
- if not hasattr(self, "_alipay_root_cert_sn"):
- self._alipay_root_cert_sn = self.get_root_cert_sn(self._alipay_root_cert_string)
- return getattr(self, "_alipay_root_cert_sn")
- class ISVAliPay(BaseAliPay):
- def __init__(self, appid, app_private_key_string, public_key_string, sign_type = "RSA2", app_auth_token = None,
- app_auth_code = None, debug = False):
- if not app_auth_token and not app_auth_code:
- raise Exception("Both app_auth_code and app_auth_token are None !!!")
- self._app_auth_token = app_auth_token
- self._app_auth_code = app_auth_code
- super(ISVAliPay, self).__init__(appid, app_private_key_string, public_key_string, sign_type, debug)
- @property
- def app_auth_token(self):
- # 没有则换取token
- if not self._app_auth_token:
- result = self.api_alipay_open_auth_token_app(self._app_auth_code)
- self._app_auth_token = result.get("app_auth_token", None)
- if not self._app_auth_token:
- raise Exception("Get auth token by auth code failed: {}".format(self._app_auth_code))
- return self._app_auth_token
- def build_body(self, method, biz_content, return_url = None, append_auth_token = True):
- return super(ISVAliPay, self).build_body(method, biz_content, return_url, append_auth_token)
- def api_alipay_open_auth_token_app(self, refresh_token = None):
- """
- response = {
- "code": "10000",
- "msg": "Success",
- "app_auth_token": "201708BB28623ce3d10f4f62875e9ef5cbeebX07",
- "app_refresh_token": "201708BB108a270d8bb6409890d16175a04a7X07",
- "auth_app_id": "appid",
- "expires_in": 31536000,
- "re_expires_in": 32140800,
- "user_id": "2088xxxxx
- }
- """
- if refresh_token:
- biz_content = {
- "grant_type": "refresh_token",
- "refresh_token": refresh_token
- }
- else:
- biz_content = {
- "grant_type": "authorization_code",
- "code": self._app_auth_code
- }
- data = self.build_body(
- "alipay.open.auth.token.app",
- biz_content,
- append_auth_token = False
- )
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_open_auth_token_app_response"
- )
- def api_alipay_open_auth_token_app_query(self):
- biz_content = {
- "app_auth_token": self.app_auth_token
- }
- data = self.build_body(
- "alipay.open.auth.token.app.query",
- biz_content,
- append_auth_token = False
- )
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_open_auth_token_app_query_response"
- )
|