1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057 |
- #!/usr/bin/env python
- # coding: utf-8
- """
- __init__.py
- ~~~~~~~~~~
- python-alipay-sdk
- """
- import base64
- import hashlib
- import datetime
- import logging
- import sys
- from functools import partial
- import json
- import OpenSSL
- import requests
- import six
- from Crypto.Signature import PKCS1_v1_5
- from Crypto.Hash import SHA, SHA256
- from Crypto.PublicKey import RSA
- from library import to_binary, to_text
- from .compat import quote_plus, decodebytes, encodebytes, b
- from .exceptions import AliPayGatewayException, AliException, AliValidationError, AliPayServiceException, \
- AliPayNetworkException
- from .base import AliErrorCode
- # 常见加密算法
- CryptoAlgSet = (
- b'rsaEncryption',
- b'md2WithRSAEncryption',
- b'md5WithRSAEncryption',
- b'sha1WithRSAEncryption',
- b'sha256WithRSAEncryption',
- b'sha384WithRSAEncryption',
- b'sha512WithRSAEncryption'
- )
- PY3 = sys.version_info[0] == 3
- from Crypto.Cipher import AES
- BLOCK_SIZE = AES.block_size
- pad = lambda s, length: s + (BLOCK_SIZE - length % BLOCK_SIZE) * chr(BLOCK_SIZE - length % BLOCK_SIZE)
- unpad = lambda s: s[:-ord(s[len(s) - 1:])]
- logger = logging.getLogger(__name__)
- def encrypt_content(content, encrypt_type, encrypt_key, charset):
- if "AES" == encrypt_type.upper():
- return aes_encrypt_content(content, encrypt_key, charset)
- raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type)
- def aes_encrypt_content(content, encrypt_key, charset):
- length = None
- if PY3:
- length = len(bytes(content, encoding = charset))
- else:
- length = len(bytes(content))
- padded_content = pad(content, length)
- iv = '\0' * BLOCK_SIZE
- cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv)
- encrypted_content = cryptor.encrypt(padded_content)
- encrypted_content = base64.b64encode(encrypted_content)
- if PY3:
- encrypted_content = str(encrypted_content, encoding = charset)
- return encrypted_content
- def decrypt_content(encrypted_content, encrypt_type, encrypt_key, charset):
- if "AES" == encrypt_type.upper():
- return aes_decrypt_content(encrypted_content, encrypt_key, charset)
- raise Exception("当前不支持该算法类型encrypt_type=" + encrypt_type)
- def aes_decrypt_content(encrypted_content, encrypt_key, charset):
- encrypted_content = base64.b64decode(encrypted_content)
- iv = '\0' * BLOCK_SIZE
- cryptor = AES.new(base64.b64decode(encrypt_key), AES.MODE_CBC, iv)
- content = unpad(cryptor.decrypt(encrypted_content))
- if PY3:
- content = content.decode(charset)
- return content
- class BaseAliPay(object):
- def __str__(self):
- _repr = '{kclass}(appid: {appid})'.format(kclass = self.__class__.__name__, appid = self.appid)
- if six.PY2:
- return to_binary(_repr)
- else:
- return to_text(_repr)
- def __repr__(self):
- return str(self)
- @property
- def appid(self):
- return self._appid
- @property
- def sign_type(self):
- return self._sign_type
- @property
- def app_auth_token(self):
- return None
- def __init__(self, appid,
- app_private_key_string,
- public_key_string,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- """
- 初始化:
- alipay = AliPay(
- appid="",
- app_notify_url="",
- app_private_key_path="",
- alipay_public_key_path="",
- sign_type="RSA2"
- )
- """
- self._appid = appid
- self.app_private_key = RSA.importKey(app_private_key_string)
- self.alipay_public_key = RSA.importKey(public_key_string)
- self.aes_encrypt_key = aes_encrypt_key
- if sign_type not in ("RSA", "RSA2"):
- raise AliException(errCode = AliErrorCode.MY_INVALID_PARAMETER,
- errMsg = 'Unsupported sign type {}'.format(sign_type),
- client = self)
- self._sign_type = sign_type
- if debug is True:
- self._gateway = "https://openapi.alipaydev.com/gateway.do"
- else:
- self._gateway = "https://openapi.alipay.com/gateway.do"
- self.timeout = timeout
- def _sign(self, raw_string):
- """
- 通过如下方法调试签名
- 方法1
- key = rsa.PrivateKey.load_pkcs1(open(self._app_private_key_path).read())
- sign = rsa.sign(unsigned_string.encode("utf8"), key, "SHA-1")
- # base64 编码,转换为unicode表示并移除回车
- sign = base64.encodebytes(sign).decode("utf8").replace("\n", "")
- 方法2
- key = RSA.importKey(open(self._app_private_key_path).read())
- signer = PKCS1_v1_5.new(key)
- signature = signer.sign(SHA.new(unsigned_string.encode("utf8")))
- # base64 编码,转换为unicode表示并移除回车
- sign = base64.encodebytes(signature).decode("utf8").replace("\n", "")
- 方法3
- echo "abc" | openssl sha1 -sign alipay.key | openssl base64
- """
- key = self.app_private_key
- signer = PKCS1_v1_5.new(key)
- if self._sign_type == "RSA":
- signature = signer.sign(SHA.new(b(raw_string)))
- else:
- signature = signer.sign(SHA256.new(b(raw_string)))
- # base64 编码,转换为unicode表示并移除回车
- sign = encodebytes(signature).decode("utf-8").replace("\n", "")
- return sign
- def _ordered_data(self, data):
- for k, v in data.items():
- if isinstance(v, dict):
- data[k] = json.dumps(v, separators = (',', ':'))
- return sorted(data.items())
- def build_body(self, method, append_auth_token = False, **query):
- data = {
- "app_id": self._appid,
- "method": method,
- "charset": "utf-8",
- "sign_type": self._sign_type,
- "timestamp": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
- "version": "1.0"
- }
- if append_auth_token:
- data["app_auth_token"] = self.app_auth_token
- data.update(query)
- if self.aes_encrypt_key and 'biz_content' in data:
- biz_content = data.pop('biz_content')
- data['biz_content'] = encrypt_content(
- content = json.dumps(biz_content,
- ensure_ascii = False,
- sort_keys = True,
- separators = (',', ':')),
- encrypt_type = 'AES',
- encrypt_key = self.aes_encrypt_key,
- charset = 'utf-8')
- return data
- def _verify(self, raw_content, signature):
- # 开始计算签名
- key = self.alipay_public_key
- signer = PKCS1_v1_5.new(key)
- if self._sign_type == "RSA":
- digest = SHA.new()
- else:
- digest = SHA256.new()
- digest.update(raw_content.encode("utf-8"))
- signature = decodebytes(signature.encode("utf-8"))
- if signer.verify(digest, signature):
- return True
- return False
- def sign_data(self, data):
- data.pop("sign", None)
- ordered_items = self._ordered_data(data)
- raw_string = "&".join("{}={}".format(k, v) for k, v in ordered_items)
- sign = self._sign(raw_string)
- unquoted_items = ordered_items + [('sign', sign)]
- return "&".join("{}={}".format(k, quote_plus(v)) for k, v in unquoted_items)
- def verify(self, data, signature, pop_sign_type = True):
- if "sign_type" in data:
- if pop_sign_type:
- sign_type = data.pop("sign_type")
- else:
- sign_type = data.get("sign_type")
- if sign_type != self._sign_type:
- raise AliValidationError(
- errmsg = 'invalid sign type',
- lvalue = self._sign_type,
- rvalue = sign_type,
- client = self)
- # 排序后的字符串
- unsigned_items = self._ordered_data(data)
- message = "&".join(u"{}={}".format(k, v) for k, v in unsigned_items)
- return self._verify(message, signature)
- def api(self, api_name, **kwargs):
- """
- alipay.api("alipay.trade.page.pay", **kwargs) ==> alipay.api_alipay_trade_page_pay(**kwargs)
- """
- api_name = api_name.replace(".", "_")
- key = "api_" + api_name
- if hasattr(self, key):
- return getattr(self, key)
- raise AttributeError("Unknown attribute" + api_name)
- def _request(self, method, url, headers = None, **kwargs):
- logger.debug("Calculate Signature URL: %s", url)
- if headers:
- headers.update({'Content-Type': 'application/json'})
- else:
- headers = {'Content-Type': 'application/json'}
- kwargs['timeout'] = kwargs.get('timeout', 15)
- logger.debug('Request to Alipay API: %s %s\n%s', method, url, kwargs)
- with requests.sessions.Session() as session:
- res = session.request(
- url = url,
- method = method,
- headers = headers,
- **kwargs)
- try:
- res.raise_for_status()
- return res.text.decode('utf-8')
- except requests.RequestException as reqe:
- raise AliPayNetworkException(
- errCode = res.status_code,
- errMsg = reqe.message,
- client = self,
- request = reqe.request,
- response = reqe.response
- )
- def api_alipay_trade_wap_pay(
- self, subject, out_trade_no, total_amount,
- return_url = None, notify_url = None, **kwargs
- ):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "QUICK_WAP_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if return_url:
- query.update({'return_url': return_url})
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.wap.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_app_pay(
- self, subject, out_trade_no, total_amount, notify_url = None, **kwargs
- ):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "QUICK_MSECURITY_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.app.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_page_pay(self, subject, out_trade_no, total_amount,
- return_url = None, notify_url = None, **kwargs):
- biz_content = {
- "subject": subject,
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "product_code": "FAST_INSTANT_TRADE_PAY"
- }
- biz_content.update(kwargs)
- query = {
- 'biz_content': biz_content
- }
- if return_url:
- query.update({'return_url': return_url})
- if notify_url:
- query.update({'notify_url': notify_url})
- data = self.build_body("alipay.trade.page.pay", **query)
- return self.sign_data(data)
- def api_alipay_trade_query(self, out_trade_no = None, trade_no = None):
- """
- response = {
- "alipay_trade_query_response": {
- "trade_no": "2017032121001004070200176844",
- "code": "10000",
- "invoice_amount": "20.00",
- "open_id": "20880072506750308812798160715407",
- "fund_bill_list": [
- {
- "amount": "20.00",
- "fund_channel": "ALIPAYACCOUNT"
- }
- ],
- "buyer_logon_id": "csq***@sandbox.com",
- "send_pay_date": "2017-03-21 13:29:17",
- "receipt_amount": "20.00",
- "out_trade_no": "out_trade_no15",
- "buyer_pay_amount": "20.00",
- "buyer_user_id": "2088102169481075",
- "msg": "Success",
- "point_amount": "0.00",
- "trade_status": "TRADE_SUCCESS",
- "total_amount": "20.00"
- },
- "sign": ""
- }
- """
- assert (out_trade_no is not None) or (trade_no is not None), \
- "Both trade_no and out_trade_no are None"
- biz_content = {}
- if out_trade_no:
- biz_content["out_trade_no"] = out_trade_no
- if trade_no:
- biz_content["trade_no"] = trade_no
- data = self.build_body("alipay.trade.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_query_response")
- def api_alipay_trade_pay(
- self, out_trade_no, scene, auth_code, subject, notify_url = None, **kwargs
- ):
- """
- eg:
- self.api_alipay_trade_pay(
- out_trade_no,
- "bar_code/wave_code",
- auth_code,
- subject,
- total_amount=12,
- discountable_amount=10
- )
- failed response = {
- "alipay_trade_pay_response": {
- "code": "40004",
- "msg": "Business Failed",
- "sub_code": "ACQ.INVALID_PARAMETER",
- "sub_msg": "",
- "buyer_pay_amount": "0.00",
- "invoice_amount": "0.00",
- "point_amount": "0.00",
- "receipt_amount": "0.00"
- },
- "sign": ""
- }
- succeeded response =
- {
- "alipay_trade_pay_response": {
- "trade_no": "2017032121001004070200176846",
- "code": "10000",
- "invoice_amount": "20.00",
- "open_id": "20880072506750308812798160715407",
- "fund_bill_list": [
- {
- "amount": "20.00",
- "fund_channel": "ALIPAYACCOUNT"
- }
- ],
- "buyer_logon_id": "csq***@sandbox.com",
- "receipt_amount": "20.00",
- "out_trade_no": "out_trade_no18",
- "buyer_pay_amount": "20.00",
- "buyer_user_id": "2088102169481075",
- "msg": "Success",
- "point_amount": "0.00",
- "gmt_payment": "2017-03-21 15:07:29",
- "total_amount": "20.00"
- },
- "sign": ""
- }
- """
- assert scene in ("bar_code", "wave_code"), 'scene not in ("bar_code", "wave_code")'
- biz_content = {
- "out_trade_no": out_trade_no,
- "scene": scene,
- "auth_code": auth_code,
- "subject": subject
- }
- biz_content.update(**kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({
- 'notify_url': notify_url
- })
- data = self.build_body("alipay.trade.pay", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_pay_response")
- def api_alipay_trade_refund(self, out_trade_no, out_request_no, refund_amount, refund_reason):
- biz_content = {
- "out_trade_no": out_trade_no,
- "refund_amount": refund_amount,
- "out_request_no": out_request_no,
- 'refund_reason': refund_reason
- }
- data = self.build_body("alipay.trade.refund", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_refund_response")
- def api_alipay_trade_cancel(self, out_trade_no = None, trade_no = None):
- """
- response = {
- "alipay_trade_cancel_response": {
- "msg": "Success",
- "out_trade_no": "out_trade_no15",
- "code": "10000",
- "retry_flag": "N"
- }
- }
- """
- assert (out_trade_no is not None) or (trade_no is not None), \
- "Both trade_no and out_trade_no are None"
- biz_content = {}
- if out_trade_no:
- biz_content["out_trade_no"] = out_trade_no
- if trade_no:
- biz_content["trade_no"] = trade_no
- data = self.build_body("alipay.trade.cancel", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_cancel_response")
- def api_alipay_trade_precreate(self, out_trade_no, total_amount, subject, **kwargs):
- """
- success response = {
- "alipay_trade_precreate_response": {
- "msg": "Success",
- "out_trade_no": "out_trade_no17",
- "code": "10000",
- "qr_code": "https://qr.alipay.com/bax03431ljhokirwl38f00a7"
- },
- "sign": ""
- }
- failed response = {
- "alipay_trade_precreate_response": {
- "msg": "Business Failed",
- "sub_code": "ACQ.TOTAL_FEE_EXCEED",
- "code": "40004",
- "sub_msg": "订单金额超过限额"
- },
- "sign": ""
- }
- """
- biz_content = {
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "subject": subject
- }
- biz_content.update(**kwargs)
- data = self.build_body("alipay.trade.precreate", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(raw_string, "alipay_trade_precreate_response")
- def api_alipay_fund_trans_toaccount_transfer(
- self, out_biz_no, payee_type, payee_account, amount, **kwargs
- ):
- assert payee_type in ("ALIPAY_USERID", "ALIPAY_LOGONID"), "unknown payee type"
- biz_content = {
- "out_biz_no": out_biz_no,
- "payee_type": payee_type,
- "payee_account": payee_account,
- "amount": amount
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.toaccount.transfer", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_toaccount_transfer_response"
- )
- def api_alipay_fund_trans_uni_transfer(
- self, out_biz_no, trans_amount, payee_info, order_title, product_code = 'TRANS_BANKCARD_NO_PWD',
- biz_scene = 'DIRECT_TRANSFER', **kwargs):
- biz_content = {
- "out_biz_no": out_biz_no,
- "trans_amount": trans_amount,
- "product_code": product_code,
- "biz_scene": biz_scene,
- "payee_info": payee_info,
- "order_title": order_title
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.uni.transfer", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_uni_transfer_response"
- )
- def api_alipay_fund_trans_common_query(self, out_biz_no, product_code = 'TRANS_BANKCARD_NO_PWD',
- biz_scene = 'DIRECT_TRANSFER', **kwargs):
- biz_content = {
- "out_biz_no": out_biz_no,
- "product_code": product_code,
- "biz_scene": biz_scene
- }
- biz_content.update(kwargs)
- data = self.build_body("alipay.fund.trans.common.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_common_query_response"
- )
- def api_alipay_trade_create(self, out_trade_no, total_amount, subject, notify_url = None, **kwargs):
- biz_content = {
- "out_trade_no": out_trade_no,
- "total_amount": total_amount,
- "subject": subject
- }
- biz_content.update(**kwargs)
- query = {
- 'biz_content': biz_content
- }
- if notify_url:
- query.update({
- 'notify_url': notify_url
- })
- data = self.build_body("alipay.trade.create", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_trade_create_response"
- )
- def api_alipay_fund_trans_order_query(self, out_biz_no = None, order_id = None):
- if out_biz_no is None and order_id is None:
- raise Exception("Both out_biz_no and order_id are None!")
- biz_content = {}
- if out_biz_no:
- biz_content["out_biz_no"] = out_biz_no
- if order_id:
- biz_content["order_id"] = order_id
- data = self.build_body("alipay.fund.trans.order.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_fund_trans_order_query_response"
- )
- def api_alipay_system_oauth_token(self, auth_code, refresh_token = None):
- """
- :param auth_code:
- :param refresh_token:
- :return: dict
- """
- if refresh_token:
- query = {
- "grant_type": "refresh_token",
- "refresh_token": refresh_token
- }
- else:
- query = {
- "grant_type": "authorization_code",
- "code": auth_code
- }
- data = self.build_body("alipay.system.oauth.token", **query)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_system_oauth_token_response"
- )
- def api_alipay_trade_refund_order_query(self, out_trade_no, out_request_no, query_options=None):
- # type:(str, str, dict) -> dict
- biz_content = {
- "out_trade_no": out_trade_no,
- "out_request_no": out_request_no,
- }
- if query_options:
- biz_content["query_options"] = query_options
- data = self.build_body("alipay.trade.fastpay.refund.query", biz_content=biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_trade_fastpay_refund_query_response"
- )
- def query_auth(self, sub_merchant_id):
- biz_content = {
- "sub_merchant_id": sub_merchant_id
- }
- data = self.build_body("alipay.merchant.indirect.smidbind.query", biz_content=biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return
- def api_alipay_data_dataservice_bill_downloadurl_query(self, bill_type = 'trade', bill_date = None):
- """
- :return: dict
- """
- #: 注意,最多只能取昨天的数据,支付宝没有说明这一点
- if bill_date is None:
- bill_date = (datetime.datetime.now() - datetime.timedelta(days = 1)).strftime('%Y-%m-%d')
- biz_content = {
- 'bill_type': bill_type,
- 'bill_date': bill_date
- }
- data = self.build_body("alipay.data.dataservice.bill.downloadurl.query", biz_content = biz_content)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_data_dataservice_bill_downloadurl_query_response"
- )
- def api_alipay_user_info_share(self, auth_token):
- """
- :param auth_token:
- :return: dict
- """
- data = self.build_body("alipay.user.info.share", auth_token = auth_token)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_user_info_share_response"
- )
- def _verify_and_return_sync_response(self, raw_string, response_type):
- """
- return data if verification succeeded, else raise exception
- """
- logger.debug('Response from Alipay API: %s', raw_string)
- response = json.loads(raw_string)
- result = response[response_type]
- sign = response["sign"]
- # locate string to be signed
- raw_string = self._get_string_to_be_signed(
- raw_string, response_type
- )
- if not self._verify(raw_string, sign):
- raise AliException(
- errCode = AliErrorCode.MY_ERROR_SIGNATURE,
- errMsg = u'签名错误',
- client = self)
- return result
- def _get_string_to_be_signed(self, raw_string, response_type):
- """
- https://doc.open.alipay.com/docs/doc.htm?docType=1&articleId=106120
- 从同步返回的接口里面找到待签名的字符串
- """
- left_index = 0
- right_index = 0
- index = raw_string.find(response_type)
- left_index = raw_string.find("{", index)
- index = left_index + 1
- balance = -1
- while balance < 0 and index < len(raw_string) - 1:
- index_a = raw_string.find("{", index)
- index_b = raw_string.find("}", index)
- # 右括号没找到, 退出
- if index_b == -1:
- break
- right_index = index_b
- # 左括号没找到,移动到右括号的位置
- if index_a == -1:
- index = index_b + 1
- balance += 1
- # 左括号出现在有括号之前,移动到左括号的位置
- elif index_a > index_b:
- balance += 1
- index = index_b + 1
- # 左括号出现在右括号之后, 移动到右括号的位置
- else:
- balance -= 1
- index = index_a + 1
- return raw_string[left_index: right_index + 1]
- def upload_image(self, imageType, imageContent):
- """
- 上传图片
- """
- path = "ant.merchant.expand.indirect.image.upload"
- query = {
- "image_type": imageType,
- "image_content": imageContent
- }
- data = self.build_body(path)
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method="get", url=url, data=query)
- return self._verify_and_return_sync_response(
- raw_string, "ant_merchant_expand_indirect_image_upload_response"
- )
- class AliPay(BaseAliPay):
- def __init__(self, appid,
- app_private_key_string,
- public_key_string,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- super(AliPay, self).__init__(
- appid = appid,
- app_private_key_string = app_private_key_string,
- public_key_string = public_key_string,
- aes_encrypt_key = aes_encrypt_key,
- sign_type = sign_type,
- debug = debug,
- timeout = timeout)
- class DCAliPay(AliPay):
- """
- 数字证书 (digital certificate) 版本
- """
- def __init__(
- self,
- appid,
- app_public_key_cert_string,
- public_key_cert_string,
- root_cert_string,
- app_private_key_string = None,
- aes_encrypt_key = None,
- sign_type = "RSA2",
- debug = False,
- timeout = 15):
- """
- 初始化
- DCAlipay(
- appid='',
- app_private_key_string='',
- app_public_key_cert_string='',
- alipay_public_key_cert_sring='',
- aplipay_root_cert_string='',
- )
- """
- self._app_public_key_cert_string = app_public_key_cert_string
- self._alipay_public_key_cert_string = public_key_cert_string
- self._alipay_root_cert_string = root_cert_string
- public_key_string = self.load_alipay_public_key_string()
- super(DCAliPay, self).__init__(
- appid = appid,
- app_private_key_string = app_private_key_string,
- public_key_string = public_key_string,
- aes_encrypt_key = aes_encrypt_key,
- sign_type = sign_type,
- debug = debug,
- timeout = timeout)
- def api_alipay_open_app_alipaycert_download(self, alipay_cert_sn):
- """
- 下载支付宝证书
- 验签使用,支付宝公钥证书无感知升级机制
- """
- biz_content = {
- "alipay_cert_sn": alipay_cert_sn
- }
- data = self.build_body("alipay.open.app.alipaycert.download", biz_content = biz_content)
- return self.sign_data(data)
- def build_body(self, method, append_auth_token = False, **query):
- data = super(DCAliPay, self).build_body(method, append_auth_token, **query)
- data["app_cert_sn"] = self.app_cert_sn
- data["alipay_root_cert_sn"] = self.alipay_root_cert_sn
- return data
- def load_alipay_public_key_string(self):
- cert = OpenSSL.crypto.load_certificate(
- OpenSSL.crypto.FILETYPE_PEM, self._alipay_public_key_cert_string
- )
- return OpenSSL.crypto.dump_publickey(
- OpenSSL.crypto.FILETYPE_PEM, cert.get_pubkey()
- ).decode("utf-8")
- @staticmethod
- def get_cert_sn(cert):
- """
- 获取证书 SN 算法
- """
- cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, cert)
- certIssue = cert.get_issuer()
- name = 'CN={},OU={},O={},C={}'.format(certIssue.CN, certIssue.OU, certIssue.O, certIssue.C)
- string = name + str(cert.get_serial_number())
- return hashlib.md5(string.encode()).hexdigest()
- @staticmethod
- def read_pem_cert_chain(certContent):
- """解析根证书"""
- # 根证书中,每个 cert 中间有两个回车间隔
- items = [i for i in certContent.split('\n\n') if i]
- load_cert = partial(OpenSSL.crypto.load_certificate, OpenSSL.crypto.FILETYPE_PEM)
- return [load_cert(c) for c in items]
- @staticmethod
- def get_root_cert_sn(rootCert):
- """ 根证书 SN 算法"""
- certs = DCAliPay.read_pem_cert_chain(rootCert)
- rootCertSN = None
- for cert in certs:
- try:
- sigAlg = cert.get_signature_algorithm()
- except ValueError:
- continue
- if sigAlg in CryptoAlgSet:
- certIssue = cert.get_issuer()
- name = 'CN={},OU={},O={},C={}'.format(
- certIssue.CN, certIssue.OU, certIssue.O, certIssue.C
- )
- string = name + str(cert.get_serial_number())
- certSN = hashlib.md5(string.encode()).hexdigest()
- if not rootCertSN:
- rootCertSN = certSN
- else:
- rootCertSN = rootCertSN + '_' + certSN
- return rootCertSN
- @property
- def app_cert_sn(self):
- if not hasattr(self, "_app_cert_sn"):
- self._app_cert_sn = self.get_cert_sn(self._app_public_key_cert_string)
- return getattr(self, "_app_cert_sn")
- @property
- def alipay_root_cert_sn(self):
- if not hasattr(self, "_alipay_root_cert_sn"):
- self._alipay_root_cert_sn = self.get_root_cert_sn(self._alipay_root_cert_string)
- return getattr(self, "_alipay_root_cert_sn")
- class ISVAliPay(BaseAliPay):
- def __init__(self, appid, app_private_key_string, public_key_string, sign_type = "RSA2", app_auth_token = None,
- app_auth_code = None, debug = False):
- if not app_auth_token and not app_auth_code:
- raise Exception("Both app_auth_code and app_auth_token are None !!!")
- self._app_auth_token = app_auth_token
- self._app_auth_code = app_auth_code
- super(ISVAliPay, self).__init__(appid, app_private_key_string, public_key_string, sign_type, debug)
- @property
- def app_auth_token(self):
- # 没有则换取token
- if not self._app_auth_token:
- result = self.api_alipay_open_auth_token_app(self._app_auth_code)
- self._app_auth_token = result.get("app_auth_token", None)
- if not self._app_auth_token:
- raise Exception("Get auth token by auth code failed: {}".format(self._app_auth_code))
- return self._app_auth_token
- def build_body(self, method, biz_content, return_url = None, append_auth_token = True):
- return super(ISVAliPay, self).build_body(method, biz_content, return_url, append_auth_token)
- def api_alipay_open_auth_token_app(self, refresh_token = None):
- """
- response = {
- "code": "10000",
- "msg": "Success",
- "app_auth_token": "201708BB28623ce3d10f4f62875e9ef5cbeebX07",
- "app_refresh_token": "201708BB108a270d8bb6409890d16175a04a7X07",
- "auth_app_id": "appid",
- "expires_in": 31536000,
- "re_expires_in": 32140800,
- "user_id": "2088xxxxx
- }
- """
- if refresh_token:
- biz_content = {
- "grant_type": "refresh_token",
- "refresh_token": refresh_token
- }
- else:
- biz_content = {
- "grant_type": "authorization_code",
- "code": self._app_auth_code
- }
- data = self.build_body(
- "alipay.open.auth.token.app",
- biz_content,
- append_auth_token = False
- )
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_open_auth_token_app_response"
- )
- def api_alipay_open_auth_token_app_query(self):
- biz_content = {
- "app_auth_token": self.app_auth_token
- }
- data = self.build_body(
- "alipay.open.auth.token.app.query",
- biz_content,
- append_auth_token = False
- )
- url = self._gateway + "?" + self.sign_data(data)
- raw_string = self._request(method = 'get', url = url, timeout = self.timeout)
- return self._verify_and_return_sync_response(
- raw_string, "alipay_open_auth_token_app_query_response"
- )
|