# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import traceback import simplejson as json from django.conf import settings from django.http import HttpResponse from typing import TYPE_CHECKING, Dict, Optional from apilib.utils_json import JsonResponse from apilib.utils_url import add_query from apps.web.common.transaction.pay import PayNotifier, OrderCacheMgr, PayRecordPoller, PayNotifyAction, PayPullUp from apps.web.constant import PollRecordDefine from apps.web.core.models import JDAggrePayApp from apps.web.core.payment import PaymentGateway, WithdrawGateway from apps.web.core.utils import async_operation, JsonOkResponse from apps.web.exceptions import UserServerException from library.jd import JDAggrePay from library.jd.exceptions import JDPayException, JDCommuException, JDValidationError from library.jd.pay import PiType, JDJosPay from taskmanager.mediator import task_caller if TYPE_CHECKING: from apps.web.common.transaction.pay import RechargeRecordT from apps.web.core.payment.jdaggre import JDAggrePaymentGateway logger = logging.getLogger(__name__) def update_record(record, **payload): # type: (RechargeRecordT, Dict)->bool if 'payFinishTime' in payload: import arrow finished_time = arrow.get(payload['payFinishTime'], 'YYYYMMDDHHmmss', tzinfo = settings.TIME_ZONE).naive else: finished_time = datetime.datetime.now() wxOrderNo = '' if 'channelNoSeq' in payload: wxOrderNo = payload['channelNoSeq'] elif 'transactionId' in payload: wxOrderNo = payload['transactionId'] elif 'tradeNo' in payload: wxOrderNo = payload['tradeNo'] else: logger.error('no any trade no info. orderNo = {}'.format(record.orderNo)) return record.succeed(wxOrderNo = wxOrderNo, transactionId = payload.get('transactionId', ''), finishedTime = finished_time, **{ 'extraInfo.tradeNo': payload.get('tradeNo', ''), 'extraInfo.notify_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) class JDAggrePayReordPoller(PayRecordPoller): def update_record(self, record, **payload): # type: (RechargeRecordT, Dict)->bool return update_record(record, **payload) def action_of_pay(self, payment_gateway, record): # type: (JDAggrePaymentGateway, RechargeRecordT)->(PayNotifyAction, str) try: result = payment_gateway.api_trade_query(out_trade_no = record.orderNo) if 'payStatus' not in result: return PayNotifyAction.Unknown, result if result['payStatus'] == 'FINISH': return PayNotifyAction.NeedHandle, result if result['payStatus'] in ['CLOSE', 'REFUND']: return PayNotifyAction.NoHandle, result return PayNotifyAction.Unknown, result except (JDCommuException, JDPayException) as e: logger.error(str(e)) return PayNotifyAction.Unknown, {} except Exception, e: logger.exception(e) return PayNotifyAction.Unknown, {} class JDAggrePayNotifier(PayNotifier): def __init__(self, request, record_cls_factory): self.orginal = json.loads(request.body) # type: Dict logger.debug('received jdaggre pay notify(encrypt): {}'.format(str(self.orginal))) super(JDAggrePayNotifier, self).__init__(request = request, record_cls_factory = record_cls_factory) def parse_request(self, request): app = JDAggrePayApp.objects(merchant_no = self.orginal['merchantNo']).first() # type: JDAggrePayApp client = JDAggrePay(app.merchant_no, app.desKey, app.saltMd5Key, app.debug) # type: JDAggrePay return client.decrypt_response(self.orginal) @property def out_trade_no(self): return self.payload['outTradeNo'] def verify(self, payment_gateway, record, payload): # type: (JDAggrePaymentGateway, RechargeRecordT, Dict)->None notifier_fen = int(payload['amount']) if record.fen_total_fee != notifier_fen: raise JDPayException( errmsg = u'无效的total_fee', lvalue = record.fen_total_fee, rvalue = notifier_fen, client = payment_gateway.client) def update_record(self, record, **payload): # type: (RechargeRecordT, Dict)->bool return update_record(record, **payload) def reply(self): return HttpResponse('ok') def do(self, post_pay): # type: (callable)->JsonResponse try: decrypt_data = self.payload logger.debug('received jdaggre pay notify(decrypt): {}'.format(str(decrypt_data))) order_no = self.out_trade_no record = self.record_cls.get_record(order_no = order_no) # type: Optional[RechargeRecordT] if not record: logger.error('no such record'.format(order_no)) return self.reply() if decrypt_data['resultCode'] != 'SUCCESS': logger.debug('record do failure'.format(str(record.id), order_no)) return self.reply() if record.is_success(): logger.error( 'record has been finished.'.format(str(record.id), order_no)) return self.reply() pay_gateway = PaymentGateway.from_gateway_key( record.my_gateway, record.pay_gateway_key, record.pay_app_type) # type: JDAggrePaymentGateway if pay_gateway.pi_type != decrypt_data['piType']: raise JDValidationError(tips = 'invalid pi type', lvalue = pay_gateway.pi_type, rvalue = decrypt_data['piType']) # 校验参数 self.verify(pay_gateway, record, decrypt_data) if decrypt_data['payStatus'] != 'FINISH': return self.reply() order_cache_mgr = OrderCacheMgr(record) try: if order_cache_mgr.check_and_set_done(): logger.debug('{} has been done because cache in notify.'.format(repr(record))) return self.reply() except Exception as e: logger.error( 'cache key is not exist or exception. exception = {}, key = {}'.format(str(e), order_cache_mgr.key)) modified = self.update_record(record, **decrypt_data) if not modified: # 如果没有任何记录被更新则说明已经处理了 logger.debug('{} has been done because db in notify'.format(repr(record))) return self.reply() logger.info('{} successfully confirmed'.format(repr(record))) async_operation(post_pay, record = record) return self.reply() except JDCommuException as e: logger.error(str(e)) except JDPayException as e: logger.error(str(e)) return self.reply() except Exception as e: logger.exception(e) class JDJosPayNotifier(JDAggrePayNotifier): """ 京东引流支付的 回调解析器 拆出来的原因是 回调的时候字段有明显区别 而且解密报文的方式不一样 主要用于解决字段不一致的问题以及 加密app获取方式不一致的问题 和拉取不通的地方在于 回调回来的merchantNo 直接 就是需要的app 不需要再次转换了 """ def do(self, post_pay): """ 解析出来的参数和 京东聚合支付的参数有些不一致 这个地方 引用京东聚合解析之前 优先改变参数的字段名称 :return: """ self.payload['piType'] = PiType.JDPAY self.payload["channelNoSeq"] = self.payload['tradeNo'] return super(JDJosPayNotifier, self).do(post_pay) class JDJosPayRecordPoller(JDAggrePayReordPoller): def action_of_pay(self, payment_gateway, record): # type: (JDAggrePaymentGateway, RechargeRecordT)->(PayNotifyAction, str) """ 查询订单 :param payment_gateway: 支付的网关 :param record: 消费记录 :return: """ payApp = payment_gateway.app josPayApp = payApp.josPayApp.fetch() # type: JDAggrePayApp # 构建出 jOS 云支付的 app 用于加密解密 # 这个商户号是 品牌注册的时候的商户号,一般来说是固定值 # 解析 通知的报文 client = JDJosPay( josPayApp.merchant_no, josPayApp.desKey, josPayApp.saltMd5Key, josPayApp.systemId, josPayApp.debug ) # 将门店的相关信息 加载到payApp 里面 需要对门店信息参数进行组装 setattr(client, "shopInfo", josPayApp.shopInfo) # 调用新构建的 这个 client 进行查询 try: result = client.api_trade_query(record.orderNo, record.wxOrderNo) # 对result进行字段转换 保证一致 result["channelNoSeq"] = result["tradeNo"] if 'payStatus' not in result: return PayNotifyAction.Unknown, result if result['payStatus'] == 'FINISH': return PayNotifyAction.NeedHandle, result if result['payStatus'] in ['CLOSE', 'REFUND']: return PayNotifyAction.NoHandle, result return PayNotifyAction.Unknown, result except (JDCommuException, JDPayException) as e: logger.error(str(e)) return PayNotifyAction.Unknown, {} except Exception, e: logger.exception(e) return PayNotifyAction.Unknown, {} class JDAggrePayPullUp(PayPullUp): """ 京东聚合支付获取支付参数。目前只作为商户支付 """ def do(self): # type: ()->HttpResponse OrderCacheMgr(self.record).initial() try: if WithdrawGateway.is_ledger(self.record.withdraw_source_key): # 资金池分账的方式 ledger_type, ledger_fee_assume, ledger_list = None, None, None else: # 商户分账的方式 ledger_type, ledger_fee_assume, ledger_list = self.payment_gateway.bill_split_rule( self.record.partition_map) pay_info = self.payment_gateway.unified_order( out_trade_no = self.record.orderNo, money = self.record.money, notify_url = self.payload.get('notifyUrl'), subject = self.record.subject, expire = 300, attach = {'dealerId': self.record.ownerId}, openId = self.openId, billSplitList = ledger_list, **{'imei': self.payload.get('imei')}) except Exception as e: logger.exception(e) self.record.fail(description = u'拉起支付发生异常', extraInfo__traceback = traceback.format_exc()) raise UserServerException(u'拉起支付发生异常') else: logger.debug('jd unified order pay info = {}'.format(str(pay_info))) pi_type = self.payment_gateway.pi_type if pi_type == PiType.ALIPAY: response = JsonOkResponse(payload = { 'tradeNO': pay_info['tradeNO'], 'outTradeNo': self.record.orderNo, 'adShow': self.payload['showAd'] }) elif pi_type == PiType.WX: pay_info.update({ 'outTradeNo': self.record.orderNo, 'golden': True, 'adShow': self.payload['showAd'] }) response = JsonOkResponse(payload = pay_info) elif pi_type == PiType.JDPAY: front_url = self.payload.get('front_url', None) if front_url: pay_url = add_query(pay_info, { 'callbackUrl': front_url }) else: pay_url = pay_info if not pay_url: self.record.fail(description = u'调起支付失败,请刷新后重试(1001)') raise UserServerException(u'调起支付失败,请刷新后重试(1001)') else: response = JsonResponse({ 'result': 1, 'description': 'SUCCESS', 'payload': { 'payUrl': pay_url }}) else: self.record.fail(description = u'调起支付失败,请刷新后重试(1002)') raise UserServerException(u'调起支付失败,请刷新后重试(1002)') task_caller('poll_user_recharge_record', delay = PollRecordDefine.DELAY_BEFORE, expires = PollRecordDefine.TASK_EXPIRES, pay_app_type = self.payment_gateway.pay_app_type, record_id = str(self.record.id), interval = PollRecordDefine.WAIT_EACH_ROUND, total_count = PollRecordDefine.TOTAL_ROUNDS) return response