__init__.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import logging
  4. import time
  5. from django.core.handlers.wsgi import WSGIRequest
  6. from django.http import HttpResponse
  7. from django.utils.module_loading import import_string
  8. from typing import TYPE_CHECKING, cast
  9. from apilib.systypes import StrEnum, Singleton
  10. from apps import lockCache
  11. from apps.web.core import PayAppType
  12. from apps.web.core.payment import PaymentGateway
  13. from apps.web.exceptions import UserServerException
  14. if TYPE_CHECKING:
  15. from apps.web.dealer.models import DealerRechargeRecord, RefundDealerRechargeRecord
  16. from apps.web.user.models import RechargeRecord, RefundMoneyRecord
  17. from typing import Union, Optional, Callable, Dict
  18. import six
  19. RechargeRecordT = Union[DealerRechargeRecord, RechargeRecord]
  20. RefundRecordT = Union[RefundDealerRechargeRecord, RefundMoneyRecord]
  21. from apps.web.core.payment.type_checking import PaymentGatewayT
  22. logger = logging.getLogger(__name__)
  23. class PayNotifyAction(StrEnum):
  24. NeedHandle = 'needHandle'
  25. NoHandle = 'noHandle'
  26. Unknown = 'unknown'
  27. class OrderCacheMgr(object):
  28. KEY = '{}-{}-lock'
  29. def __init__(self, record_or_id, cls_name = None):
  30. # type: (Union[RechargeRecordT, basestring], Optional[str])->None
  31. if isinstance(record_or_id, basestring):
  32. assert cls_name, 'cls_name must not be none if record is string'
  33. self.key = self.KEY.format(cls_name, record_or_id)
  34. else:
  35. assert not cls_name, 'cls_name must be none if record is object'
  36. self.key = self.KEY.format(record_or_id.__class__.__name__, str(record_or_id.id))
  37. def initial(self):
  38. try:
  39. return lockCache.set(self.key, '0', 25 * 3600)
  40. except Exception as e:
  41. logger.exception(e)
  42. return 0
  43. def check_and_set_done(self):
  44. # type: ()->bool
  45. new_value = lockCache.incr(self.key)
  46. if int(new_value) >= 2:
  47. return True
  48. else:
  49. return False
  50. def has_done(self):
  51. # type: ()->bool
  52. new_value = lockCache.get(self.key, None)
  53. if not new_value:
  54. return False
  55. if int(new_value) >= 1:
  56. return True
  57. else:
  58. return False
  59. class PayRecordPoller(object):
  60. def __init__(self, record_id, interval, total_count, record_cls, post_pay):
  61. # type: (str, int, int, six.class_types, Callable)->None
  62. self.record_id = record_id
  63. self.interval = interval
  64. self.total_count = total_count
  65. self.record_cls = record_cls
  66. self.post_pay = post_pay
  67. def update_record(self, record, **payload):
  68. # type: (RechargeRecordT, Dict)->bool
  69. raise NotImplementedError('sub class must implement update_record')
  70. def action_of_pay(self, payment_gateway, record):
  71. # type: (PaymentGateway, RechargeRecordT)->(PayNotifyAction, str)
  72. raise NotImplementedError('sub class must implement action_of_pay')
  73. def start(self):
  74. logger.debug('poll record<id={}> pay status.'.format(self.record_id))
  75. count = 0
  76. order_cache_mgr = OrderCacheMgr(self.record_id, self.record_cls.__name__)
  77. if order_cache_mgr.has_done():
  78. logger.debug('record<id={}> has done.'.format(self.record_id))
  79. return
  80. record = self.record_cls.objects(id = self.record_id).first() # type: RechargeRecordT
  81. if not record:
  82. logger.error('record<id={}> is not exist.'.format(self.record_id))
  83. return
  84. if record.is_success:
  85. logger.error('record<id={},order={}> is success.'.format(self.record_id, record.orderNo))
  86. return
  87. payment_gateway = PaymentGateway.from_gateway_key(
  88. record.my_gateway,
  89. record.pay_gateway_key,
  90. record.pay_app_type) # type: PaymentGatewayT
  91. while count < self.total_count:
  92. try:
  93. if count != 0 and order_cache_mgr.has_done():
  94. logger.debug('record<id={},order={}> has done.'.format(self.record_id, record.orderNo))
  95. return
  96. action, query_trade_result = self.action_of_pay(payment_gateway, record)
  97. logger.debug(
  98. 'record<id={},order={}> trade info: trade result = {}, action = {}.'.format(
  99. self.record_id, record.orderNo, str(query_trade_result), action))
  100. if action == PayNotifyAction.NoHandle:
  101. return
  102. if action == PayNotifyAction.Unknown:
  103. continue
  104. try:
  105. if order_cache_mgr.check_and_set_done():
  106. logger.debug(
  107. 'record<id={},order={}> check has done.'.format(self.record_id, record.orderNo))
  108. return
  109. except Exception as e:
  110. logger.error(
  111. 'record<id={},order={}> cache key is not exist or exception. exception = {}, key = {}'.format(
  112. self.record_id, record.orderNo, str(e), order_cache_mgr.key))
  113. modified = self.update_record(record, **query_trade_result)
  114. if not modified:
  115. logger.debug(
  116. 'record<id={},order={}> has been modified.'.format(self.record_id, record.orderNo))
  117. return
  118. return self.post_pay(record)
  119. except Exception as e:
  120. logger.exception(
  121. 'record<id={},order={}> poll trade status failure. e = {}'.format(self.record_id, record.orderNo,
  122. str(e)))
  123. finally:
  124. count = count + 1
  125. time.sleep(self.interval)
  126. class PayNotifier(object):
  127. def __init__(self, request, record_cls_factory):
  128. # type: (WSGIRequest, callable)->None
  129. self.payload = self.parse_request(request)
  130. self.record_cls = record_cls_factory(self.out_trade_no)
  131. def parse_request(self, request):
  132. # type: (WSGIRequest)->dict
  133. raise NotImplementedError('sub class must implement update_record')
  134. @property
  135. def out_trade_no(self):
  136. raise AttributeError('no out_trade_no attr.')
  137. def update_record(self, record, **payload):
  138. # type: (RechargeRecordT, Dict)->bool
  139. raise NotImplementedError('sub class must implement update_record')
  140. def do(self, post_pay):
  141. # type: (callable)->HttpResponse
  142. raise NotImplementedError('sub class must implement do')
  143. class PayPullUpFactoryIntf(object):
  144. @classmethod
  145. def create(cls, payment_gateway, record, **payload):
  146. # type: (PaymentGatewayT, RechargeRecordT, dict)->PayPullUp
  147. raise NotImplementedError('sub class must implement create')
  148. class PayPullUp(object):
  149. def __init__(self, openId, payment_gateway, record, **payload):
  150. # type: (basestring, PaymentGatewayT, RechargeRecordT, dict)->None
  151. self.openId = openId
  152. self.record = record
  153. self.payment_gateway = payment_gateway
  154. self.payload = payload
  155. @classmethod
  156. def create(cls, factory_cls, payment_gateway, record, **payload):
  157. # type: (cast(PayPullUpFactoryIntf, None), PaymentGatewayT, RechargeRecordT, dict)->PayPullUp
  158. return factory_cls.create(payment_gateway, record, **payload)
  159. def do(self):
  160. # type: ()->HttpResponse
  161. raise NotImplementedError('sub class must implement do')
  162. class PayManager(Singleton):
  163. def __init__(self):
  164. super(PayManager, self).__init__()
  165. self.map_dict = {
  166. PayAppType.WECHAT: {
  167. 'poller': import_string('apps.web.common.transaction.pay.wechat.WechatPayRecordPoller'),
  168. 'notifier': import_string('apps.web.common.transaction.pay.wechat.WechatPayNotifier'),
  169. 'pullup': import_string('apps.web.common.transaction.pay.wechat.WechatPullUp'),
  170. },
  171. PayAppType.ALIPAY: {
  172. 'poller': import_string('apps.web.common.transaction.pay.alipay.AliPayRecordPoller'),
  173. 'notifier': import_string('apps.web.common.transaction.pay.alipay.AliPayNotifier'),
  174. 'pullup': import_string('apps.web.common.transaction.pay.alipay.AlipayPullUp')
  175. }
  176. }
  177. def get_poller(self, pay_app_type):
  178. assert pay_app_type in self.map_dict, 'not register pay app type'
  179. return self.map_dict[pay_app_type]['poller']
  180. def get_notifier(self, pay_app_type):
  181. assert pay_app_type in self.map_dict, 'not register pay app type'
  182. return self.map_dict[pay_app_type]['notifier']
  183. def get_pull_up(self, pay_app_type):
  184. if pay_app_type not in self.map_dict:
  185. raise UserServerException(u'第三方支付配置错误,请联系平台客服(1002)')
  186. return self.map_dict[pay_app_type]['pullup']