jdaggre.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. import traceback
  6. import simplejson as json
  7. from django.conf import settings
  8. from django.http import HttpResponse
  9. from typing import TYPE_CHECKING, Dict, Optional
  10. from apilib.utils_json import JsonResponse
  11. from apilib.utils_url import add_query
  12. from apps.web.common.transaction.pay import PayNotifier, OrderCacheMgr, PayRecordPoller, PayNotifyAction, PayPullUp
  13. from apps.web.constant import PollRecordDefine
  14. from apps.web.core.models import JDAggrePayApp
  15. from apps.web.core.payment import PaymentGateway, WithdrawGateway
  16. from apps.web.core.utils import async_operation, JsonOkResponse
  17. from apps.web.exceptions import UserServerException
  18. from library.jd import JDAggrePay
  19. from library.jd.exceptions import JDPayException, JDCommuException, JDValidationError
  20. from library.jd.pay import PiType, JDJosPay
  21. from taskmanager.mediator import task_caller
  22. if TYPE_CHECKING:
  23. from apps.web.common.transaction.pay import RechargeRecordT
  24. from apps.web.core.payment.jdaggre import JDAggrePaymentGateway
  25. logger = logging.getLogger(__name__)
  26. def update_record(record, **payload):
  27. # type: (RechargeRecordT, Dict)->bool
  28. if 'payFinishTime' in payload:
  29. import arrow
  30. finished_time = arrow.get(payload['payFinishTime'], 'YYYYMMDDHHmmss', tzinfo = settings.TIME_ZONE).naive
  31. else:
  32. finished_time = datetime.datetime.now()
  33. wxOrderNo = ''
  34. if 'channelNoSeq' in payload:
  35. wxOrderNo = payload['channelNoSeq']
  36. elif 'transactionId' in payload:
  37. wxOrderNo = payload['transactionId']
  38. elif 'tradeNo' in payload:
  39. wxOrderNo = payload['tradeNo']
  40. else:
  41. logger.error('no any trade no info. orderNo = {}'.format(record.orderNo))
  42. return record.succeed(wxOrderNo = wxOrderNo,
  43. transactionId = payload.get('transactionId', ''),
  44. finishedTime = finished_time,
  45. **{
  46. 'extraInfo.tradeNo': payload.get('tradeNo', ''),
  47. 'extraInfo.notify_time': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
  48. })
  49. class JDAggrePayReordPoller(PayRecordPoller):
  50. def update_record(self, record, **payload):
  51. # type: (RechargeRecordT, Dict)->bool
  52. return update_record(record, **payload)
  53. def action_of_pay(self, payment_gateway, record):
  54. # type: (JDAggrePaymentGateway, RechargeRecordT)->(PayNotifyAction, str)
  55. try:
  56. result = payment_gateway.api_trade_query(out_trade_no = record.orderNo)
  57. if 'payStatus' not in result:
  58. return PayNotifyAction.Unknown, result
  59. if result['payStatus'] == 'FINISH':
  60. return PayNotifyAction.NeedHandle, result
  61. if result['payStatus'] in ['CLOSE', 'REFUND']:
  62. return PayNotifyAction.NoHandle, result
  63. return PayNotifyAction.Unknown, result
  64. except (JDCommuException, JDPayException) as e:
  65. logger.error(str(e))
  66. return PayNotifyAction.Unknown, {}
  67. except Exception, e:
  68. logger.exception(e)
  69. return PayNotifyAction.Unknown, {}
  70. class JDAggrePayNotifier(PayNotifier):
  71. def __init__(self, request, record_cls_factory):
  72. self.orginal = json.loads(request.body) # type: Dict
  73. logger.debug('received jdaggre pay notify(encrypt): {}'.format(str(self.orginal)))
  74. super(JDAggrePayNotifier, self).__init__(request = request, record_cls_factory = record_cls_factory)
  75. def parse_request(self, request):
  76. app = JDAggrePayApp.objects(merchant_no = self.orginal['merchantNo']).first() # type: JDAggrePayApp
  77. client = JDAggrePay(app.merchant_no, app.desKey, app.saltMd5Key, app.debug) # type: JDAggrePay
  78. return client.decrypt_response(self.orginal)
  79. @property
  80. def out_trade_no(self):
  81. return self.payload['outTradeNo']
  82. def verify(self, payment_gateway, record, payload):
  83. # type: (JDAggrePaymentGateway, RechargeRecordT, Dict)->None
  84. notifier_fen = int(payload['amount'])
  85. if record.fen_total_fee != notifier_fen:
  86. raise JDPayException(
  87. errmsg = u'无效的total_fee',
  88. lvalue = record.fen_total_fee,
  89. rvalue = notifier_fen,
  90. client = payment_gateway.client)
  91. def update_record(self, record, **payload):
  92. # type: (RechargeRecordT, Dict)->bool
  93. return update_record(record, **payload)
  94. def reply(self):
  95. return HttpResponse('ok')
  96. def do(self, post_pay):
  97. # type: (callable)->JsonResponse
  98. try:
  99. decrypt_data = self.payload
  100. logger.debug('received jdaggre pay notify(decrypt): {}'.format(str(decrypt_data)))
  101. order_no = self.out_trade_no
  102. record = self.record_cls.get_record(order_no = order_no) # type: Optional[RechargeRecordT]
  103. if not record:
  104. logger.error('no such record<orderNo={}>'.format(order_no))
  105. return self.reply()
  106. if decrypt_data['resultCode'] != 'SUCCESS':
  107. logger.debug('record<id={},orderNo={}> do failure'.format(str(record.id), order_no))
  108. return self.reply()
  109. if record.is_success():
  110. logger.error(
  111. 'record<id={},orderNo={}> has been finished.'.format(str(record.id), order_no))
  112. return self.reply()
  113. pay_gateway = PaymentGateway.from_gateway_key(
  114. record.my_gateway,
  115. record.pay_gateway_key,
  116. record.pay_app_type) # type: JDAggrePaymentGateway
  117. if pay_gateway.pi_type != decrypt_data['piType']:
  118. raise JDValidationError(tips = 'invalid pi type',
  119. lvalue = pay_gateway.pi_type,
  120. rvalue = decrypt_data['piType'])
  121. # 校验参数
  122. self.verify(pay_gateway, record, decrypt_data)
  123. if decrypt_data['payStatus'] != 'FINISH':
  124. return self.reply()
  125. order_cache_mgr = OrderCacheMgr(record)
  126. try:
  127. if order_cache_mgr.check_and_set_done():
  128. logger.debug('{} has been done because cache in notify.'.format(repr(record)))
  129. return self.reply()
  130. except Exception as e:
  131. logger.error(
  132. 'cache key is not exist or exception. exception = {}, key = {}'.format(str(e), order_cache_mgr.key))
  133. modified = self.update_record(record, **decrypt_data)
  134. if not modified:
  135. # 如果没有任何记录被更新则说明已经处理了
  136. logger.debug('{} has been done because db in notify'.format(repr(record)))
  137. return self.reply()
  138. logger.info('{} successfully confirmed'.format(repr(record)))
  139. async_operation(post_pay, record = record)
  140. return self.reply()
  141. except JDCommuException as e:
  142. logger.error(str(e))
  143. except JDPayException as e:
  144. logger.error(str(e))
  145. return self.reply()
  146. except Exception as e:
  147. logger.exception(e)
  148. class JDJosPayNotifier(JDAggrePayNotifier):
  149. """
  150. 京东引流支付的 回调解析器 拆出来的原因是 回调的时候字段有明显区别 而且解密报文的方式不一样
  151. 主要用于解决字段不一致的问题以及 加密app获取方式不一致的问题
  152. 和拉取不通的地方在于 回调回来的merchantNo 直接 就是需要的app 不需要再次转换了
  153. """
  154. def do(self, post_pay):
  155. """
  156. 解析出来的参数和 京东聚合支付的参数有些不一致 这个地方 引用京东聚合解析之前 优先改变参数的字段名称
  157. :return:
  158. """
  159. self.payload['piType'] = PiType.JDPAY
  160. self.payload["channelNoSeq"] = self.payload['tradeNo']
  161. return super(JDJosPayNotifier, self).do(post_pay)
  162. class JDJosPayRecordPoller(JDAggrePayReordPoller):
  163. def action_of_pay(self, payment_gateway, record): # type: (JDAggrePaymentGateway, RechargeRecordT)->(PayNotifyAction, str)
  164. """
  165. 查询订单
  166. :param payment_gateway: 支付的网关
  167. :param record: 消费记录
  168. :return:
  169. """
  170. payApp = payment_gateway.app
  171. josPayApp = payApp.josPayApp.fetch() # type: JDAggrePayApp
  172. # 构建出 jOS 云支付的 app 用于加密解密
  173. # 这个商户号是 品牌注册的时候的商户号,一般来说是固定值
  174. # 解析 通知的报文
  175. client = JDJosPay(
  176. josPayApp.merchant_no,
  177. josPayApp.desKey,
  178. josPayApp.saltMd5Key,
  179. josPayApp.systemId,
  180. josPayApp.debug
  181. )
  182. # 将门店的相关信息 加载到payApp 里面 需要对门店信息参数进行组装
  183. setattr(client, "shopInfo", josPayApp.shopInfo)
  184. # 调用新构建的 这个 client 进行查询
  185. try:
  186. result = client.api_trade_query(record.orderNo, record.wxOrderNo)
  187. # 对result进行字段转换 保证一致
  188. result["channelNoSeq"] = result["tradeNo"]
  189. if 'payStatus' not in result:
  190. return PayNotifyAction.Unknown, result
  191. if result['payStatus'] == 'FINISH':
  192. return PayNotifyAction.NeedHandle, result
  193. if result['payStatus'] in ['CLOSE', 'REFUND']:
  194. return PayNotifyAction.NoHandle, result
  195. return PayNotifyAction.Unknown, result
  196. except (JDCommuException, JDPayException) as e:
  197. logger.error(str(e))
  198. return PayNotifyAction.Unknown, {}
  199. except Exception, e:
  200. logger.exception(e)
  201. return PayNotifyAction.Unknown, {}
  202. class JDAggrePayPullUp(PayPullUp):
  203. """
  204. 京东聚合支付获取支付参数。目前只作为商户支付
  205. """
  206. def do(self): # type: ()->HttpResponse
  207. OrderCacheMgr(self.record).initial()
  208. try:
  209. if WithdrawGateway.is_ledger(self.record.withdraw_source_key): # 资金池分账的方式
  210. ledger_type, ledger_fee_assume, ledger_list = None, None, None
  211. else: # 商户分账的方式
  212. ledger_type, ledger_fee_assume, ledger_list = self.payment_gateway.bill_split_rule(
  213. self.record.partition_map)
  214. pay_info = self.payment_gateway.unified_order(
  215. out_trade_no = self.record.orderNo,
  216. money = self.record.money,
  217. notify_url = self.payload.get('notifyUrl'),
  218. subject = self.record.subject,
  219. expire = 300,
  220. attach = {'dealerId': self.record.ownerId},
  221. openId = self.openId,
  222. billSplitList = ledger_list,
  223. **{'imei': self.payload.get('imei')})
  224. except Exception as e:
  225. logger.exception(e)
  226. self.record.fail(description = u'拉起支付发生异常', extraInfo__traceback = traceback.format_exc())
  227. raise UserServerException(u'拉起支付发生异常')
  228. else:
  229. logger.debug('jd unified order pay info = {}'.format(str(pay_info)))
  230. pi_type = self.payment_gateway.pi_type
  231. if pi_type == PiType.ALIPAY:
  232. response = JsonOkResponse(payload = {
  233. 'tradeNO': pay_info['tradeNO'],
  234. 'outTradeNo': self.record.orderNo,
  235. 'adShow': self.payload['showAd']
  236. })
  237. elif pi_type == PiType.WX:
  238. pay_info.update({
  239. 'outTradeNo': self.record.orderNo,
  240. 'golden': True,
  241. 'adShow': self.payload['showAd']
  242. })
  243. response = JsonOkResponse(payload = pay_info)
  244. elif pi_type == PiType.JDPAY:
  245. front_url = self.payload.get('front_url', None)
  246. if front_url:
  247. pay_url = add_query(pay_info, {
  248. 'callbackUrl': front_url
  249. })
  250. else:
  251. pay_url = pay_info
  252. if not pay_url:
  253. self.record.fail(description = u'调起支付失败,请刷新后重试(1001)')
  254. raise UserServerException(u'调起支付失败,请刷新后重试(1001)')
  255. else:
  256. response = JsonResponse({
  257. 'result': 1,
  258. 'description': 'SUCCESS',
  259. 'payload': {
  260. 'payUrl': pay_url
  261. }})
  262. else:
  263. self.record.fail(description = u'调起支付失败,请刷新后重试(1002)')
  264. raise UserServerException(u'调起支付失败,请刷新后重试(1002)')
  265. task_caller('poll_user_recharge_record',
  266. delay = PollRecordDefine.DELAY_BEFORE,
  267. expires = PollRecordDefine.TASK_EXPIRES,
  268. pay_app_type = self.payment_gateway.pay_app_type,
  269. record_id = str(self.record.id),
  270. interval = PollRecordDefine.WAIT_EACH_ROUND,
  271. total_count = PollRecordDefine.TOTAL_ROUNDS)
  272. return response