commonPulse.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import datetime
  4. import logging
  5. from typing import TYPE_CHECKING
  6. from apilib.monetary import VirtualCoin
  7. from apps.web.constant import ErrorCode, Const, DeviceErrorCodeDesc
  8. from apps.web.device.models import Group, Device
  9. from apps.web.eventer import EventBuilder
  10. from apps.web.eventer.base import AckEvent
  11. from apps.web.user.models import ConsumeRecord
  12. from apps.web.user.utils import freeze_user_balance, clear_frozen_user_balance, recover_user_frozen_balance
  13. if TYPE_CHECKING:
  14. pass
  15. logger = logging.getLogger(__name__)
  16. class builder(EventBuilder):
  17. def __getEvent__(self, device_event):
  18. # type:(dict)->PulseAckEvent
  19. if 'order_id' not in device_event or 'order_type' not in device_event:
  20. logger.error('order info is not complete.')
  21. return None
  22. if device_event['order_type'] != 'pulse_start':
  23. logger.error('order type<{}> is not match.'.format(device_event['order_type']))
  24. return None
  25. return PulseAckEvent(self.deviceAdapter, device_event)
  26. class PulseAckEvent(AckEvent):
  27. """
  28. 脉冲设备订单状态只可能是created, timeout, finished
  29. """
  30. def do_impl(self, **args):
  31. order_id = self.event_data['order_id']
  32. order = ConsumeRecord.objects(ownerId = self.device.ownerId,
  33. orderNo = order_id).first() # type: ConsumeRecord
  34. if not order:
  35. logger.debug('order<no={}> is not exist.'.format(self.event_data['order_id']))
  36. return
  37. if order.status == 'finished':
  38. logger.debug('order<{}> has been fished.'.format(repr(order)))
  39. return
  40. if order.used_port != self.event_data['port']:
  41. logger.error('port is not equal. {} != {}'.format(self.event_data['port'], order.used_port))
  42. return
  43. if self.event_data['rst'] == ErrorCode.DEVICE_SUCCESS:
  44. consume_dict = order.my_package.initial_consume_dict
  45. consume_dict.update({'refundedMoney': 0})
  46. if order.status == 'timeout':
  47. freeze_user_balance(self.device, Group.get_group(order.groupId), order) # 补充扣款
  48. err_desc = u'事件上报成功,补充扣款'
  49. else:
  50. err_desc = 'success'
  51. clear_frozen_user_balance(self.device, order, consume_dict['duration'], 0, VirtualCoin(0))
  52. start_ts = self.event_data['sts']
  53. finished_ts = start_ts + consume_dict['duration'] * 60
  54. # 设置订单执行完成
  55. order.status = 'finished'
  56. order.isNormal = True
  57. order.errorDesc = err_desc
  58. order.startTime = datetime.datetime.fromtimestamp(start_ts)
  59. order.finishedTime = datetime.datetime.fromtimestamp(finished_ts)
  60. order.save()
  61. # 处理缓存
  62. try:
  63. control_cache = Device.get_dev_control_cache(self.device.devNo)
  64. if not control_cache or 'startTime' not in control_cache or control_cache[
  65. 'startTime'] < order.startTime.strftime(Const.DATETIME_FMT):
  66. Device.update_dev_control_cache(self.device.devNo,
  67. {
  68. 'status': Const.DEV_WORK_STATUS_WORKING,
  69. 'startTime': order.startTime.strftime(Const.DATETIME_FMT),
  70. 'finishedTime': finished_ts
  71. })
  72. except Exception as e:
  73. logger.exception('error = %s' % e)
  74. # 增加统计
  75. order.update_service_info(consume_dict)
  76. else:
  77. logger.error('order<{}> failure.'.format(repr(order)))
  78. recover_user_frozen_balance(self.device, Group.get_group(order.groupId), order)
  79. order.status = 'finished'
  80. order.isNormal = False
  81. order.finishedTime = datetime.datetime.now()
  82. order.errorDesc = DeviceErrorCodeDesc.get(self.event_data['rst'])
  83. order.save()
  84. consume_dict = {
  85. 'duration': 0,
  86. 'refundedMoney': VirtualCoin(order.coin).mongo_amount
  87. }
  88. order.update_service_info(consume_dict)