# -*- coding: utf-8 -*- # !/usr/bin/env python """ 整个流程: MyWorkEvent 1 刷空后收到查询事件--> 2 根据卡号在本地查询卡信息(没有则创建) --> 3 根据卡信息找到卡片是否存在未付款订单 --> 4 清理付款这些订单 --> 5 清理后订单后,收到余额,返回给模块 MyWorkEvent 1 收到上报事件请求扣款(洗衣机已经启动) --> 2 创建订单(默认非正常订单) --> 3 洗衣机上报状态更新订单状态(扣款成功, 修改订单为正常,添加付款信息)--> 4 核销该上报事件 白名单:serviceCache.set(yc-card-card_no_hex,1) 白名单固定卡 757996699 不走查询 不会走扣费 会生存订单! 测试卡号:757996699 日志前缀:caiyi_ """ import datetime import json import logging import time from functools import wraps from arrow import Arrow from django.conf import settings from mongoengine import Q from typing import TYPE_CHECKING from apilib.monetary import VirtualCoin, RMB from apilib.utils_string import cn from apps import serviceCache from apps.web.common.models import TempValues from apps.web.constant import DeviceCmdCode, ErrorCode, DeviceErrorCodeDesc, START_DEVICE_STATUS, Const from apps.web.common.transaction import UserConsumeSubType from apps.web.core.adapter.base import reverse_hex from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.dealer.models import Dealer from apps.web.device.models import Group, Device from apps.web.eventer.base import AckEvent, WorkEvent from apps.web.eventer import EventBuilder from apps.web.user.models import ConsumeRecord, MyUser, Card, ServiceProgress from apps.web.south_intf.yuchuanApi import YuChuanApi from apps.web.user.utils import freeze_user_balance, clear_frozen_user_balance from apps.web.utils import set_start_key_status if TYPE_CHECKING: pass logger = logging.getLogger(__name__) # 学校服务器地址 # url = 'http://119.136.18.14:9002' # termId = '2' # 测试服务器地址 # url = 'http://120.237.110.50:7622', # termId = '104', # 配置 config_dic = { # 接口 'url': 'http://119.136.18.14:9002', 'systemName': 'schooladmin', 'termId': '101', 'bagCode': "1", 'roomNumb': '12345', # 中间库 'HOST': "119.136.18.14", 'PART': '1433', 'USER': 'yczn', 'PASSWORD': 'yczn123', 'DATABASE': 'q1_base', 'TABLE': 'USER_CARDINFO' } class Tools: @staticmethod def pass_same_event(exp = 30): def warpper(func): @wraps(func) def inner(self, *args, **kwargs): if serviceCache.get("yc_%s" % self.event_data['IMEI']): logger.info('\33[36m caiyi_相同查询事件上报 pass掉 \33[0m') return serviceCache.set("yc_%s" % self.event_data['IMEI'], 1, exp) try: func(self, *args, **kwargs) except Exception as e: pass serviceCache.delete("yc_%s" % self.event_data['IMEI']) return inner return warpper class builder(EventBuilder): def __getEvent__(self, device_event): if 'funCode' in device_event and device_event['funCode'] == '11': return MyWorkEvent(self.deviceAdapter, device_event) else: if 'order_id' in device_event: if device_event['order_type'] in ['com_start', 'pulse_start']: return MyNetPayAckEvent(self.deviceAdapter, device_event) if device_event['order_type'] == 'id_start': return MyIDStartAckEvent(self.deviceAdapter, device_event) class MyWorkEvent(WorkEvent): # 保存用户到本地,没有余额信息(用card_no占用openId字段为用户以后扫码用户绑定时候直接获取根据卡号获取卡对象) def save_user_info(self, user_info, devNo): """ user_info = {"factoryFixId": 2869422464, "subsidyValue": 0, "bankCardNo": "", "creditValue": 11.27, "orderingValue": 0, "mainDeputyType": 1, "cardNo": 101, "userXm": "测试人员", "userNumb": "1001", "userId": "20050814503501000008", "consumeValue": 11.27, "cardStatusId": 1, "waterValue": 10.0, "cardId": "20050814505701000002", "repastValue": 0, "identityNumber": "", "xfPwd": "MTIzNDU2"} """ cardNo = str(user_info.get('factoryFixId')) # 物理卡号 cardName = str(user_info.get('userXm')) # 持卡人 campus_user_num = str(user_info.get('userNumb')) # 存入本地服务器 card = Card(cardNo = cardNo, cardName = cardName, remarks = '宇川一卡通', agentId = self.dealer.agentId, openId = cardNo, dealerId = str(self.dealer.id), cardType = 'YuChuanCard', isHaveBalance=False, devNo=devNo, nickName=campus_user_num) card.attachParas = {'campus_user_num': campus_user_num, 'card_dealer': "宇川智能平台一卡通", "unpay_order": []} card.save() def get_old_order_by_card(self, card): # type:# (object)->Card dealers = Dealer.objects.filter(agentId = self.dealer.agentId) dealers = [str(dealer.id) for dealer in dealers] orders = ConsumeRecord.objects.filter(Q(openId__in = [card.openId, card.cardNo]) & Q(ownerId__in = dealers), remarks = 'YuChuanCard', isNormal = False, status__ne = 'finished') return orders # @Tools.pass_same_event(5) def do(self, **args): yc = YuChuanApi(**config_dic) factoryFixId_hex = str(self.event_data['cardNo']) factoryFixId = str(int(reverse_hex(factoryFixId_hex), 16)) devNo = str(self.event_data.get('IMEI')) logger.info( '\33[32m caiyi_received data:{},cardNo:{} \33[0m'.format(json.dumps(self.event_data, ), factoryFixId)) # 测试白名单 test_card = serviceCache.get('yc-card-{}'.format(factoryFixId_hex)) or TempValues.get('yc-card-{}'.format(factoryFixId_hex)) or TempValues.get('yc-all-free') if test_card or factoryFixId_hex == '667ACCE5': factoryFixId = '757996699' logger.debug('\33[32m caiyi_yc_test_card,get_balance \33[0m') price = self.device['otherConf'].get('cardPrice', 1) packages = self.device['washConfig'] package = max(packages.values(), key=lambda x: x["coins"]) max_coin = int(package.get("coins")) each_coin = int(self.device['otherConf'].get('eachCoin', 1)) payload = {'balance': RMB.yuan_to_fen(9999), 'price': int(price * 100), 'funCode': '11', 'cardNo': factoryFixId_hex, 'max_coin': max_coin, "driver_type": "aux_uart", "time": 15,'each_coin':each_coin} MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload) logger.info("\33[32m caiyi_yc_test_card_payload:%s\33[0m" % json.dumps(payload, ensure_ascii = False, encoding = 'utf-8')) return balance = RMB(0) payload = serviceCache.get('yuchuanyikatong_%s_%s' % (self.device.logicalCode, factoryFixId)) if payload: MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload) logger.info( '\33[32m caiyi_payload(cache):%s\33[0m' % json.dumps(payload, encoding = 'utf-8', ensure_ascii = False)) return # TODO 清理订单,请求余额 # 先查询卡,如果没有就创建卡信息 card = Card.objects.filter(cardNo = factoryFixId, cardType = 'YuChuanCard').first() if not card: user_info = yc.get_user_info(factoryFixId) # 如果宇川服务器上找不到卡号,则卡不对,不返回数据 if not user_info: logger.info( '\33[32m caiyi_The server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format(factoryFixId, factoryFixId_hex)) return self.save_user_info(user_info,devNo) balance = RMB(user_info.get('consumeValue')) else: # 如果有卡,查看之前的订单是否都付款成功 unpay_ordet_list = card.attachParas.get('unpay_order') if unpay_ordet_list: unpay_orders = ConsumeRecord.objects.filter(id__in = unpay_ordet_list) userNumb = card.attachParas.get('campus_user_num') for order in unpay_orders: data = { "orderSerial": str(order.orderNo), "userNumb": str(userNumb), "factoryFixId": str(factoryFixId), "consumeValue": str(order.coin), } # 需要测试确认扣费成功 res = yc.pay_order(**data) if not res: logger.info( '\33[32m caiyi_Clear order is error,cardNo:{},orderNo:{},msg:{} \33[0m'.format(factoryFixId, order.orderNo, json.dumps( res, encoding = 'utf-8', ensure_ascii = False))) continue if res['success'] == False: logger.info( "\33[32m caiyi_Clear order failed,cardNo:{},orderNo:{},msg:{} \33[0m".format(factoryFixId, order.orderNo, json.dumps(res, encoding = 'utf-8', ensure_ascii = False))) continue if res['success'] == True: order.paymentInfo['coins'] = str(order.coin) order.attachParas['coins'] = str(order.coin) order.attachParas['isCleanOrder'] = True order.status = 'finished' order.isNormal = True order.save() unpay_ordet_list.remove(str(order.id)) balance = res.get('data').get('consumeValue') logger.info( "\33[32m caiyi_Clear order success,coin:{},orderNo:{},msg:{}\33[0m".format(order.coin, order.orderNo, json.dumps(res, encoding = 'utf-8', ensure_ascii = False))) # 将清单后的ordet_set塞回去 card.attachParas['unpay_order'] = unpay_ordet_list card.save() # 如果还有未付订单,直接返回,无法使用!!! if unpay_ordet_list: logger.info( '\33[32m caiyi_card has unpay orders!!!,cardNo:{},cardNo_hex:{} \33[0m'.format(factoryFixId, factoryFixId)) return # 没有欠费订单 else: user_info = yc.get_user_info(factoryFixId) if not user_info: logger.info( '\33[32m caiyi_The YC_server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format( factoryFixId, factoryFixId_hex)) return # 更新一次用户学号信息,方便查询 card.attachParas['campus_user_num'] = user_info.get('userNumb') card.save() balance = RMB(user_info.get('consumeValue')) # 添加自定义定价 price = int(self.device['otherConf'].get('cardPrice', 1)) # 判断balance 是否能完成本次支付 if balance < RMB(price): logger.info( '\33[32m caiyi_The balance on the card is insufficient. balance:{} price:{} cardNo:{},cardNo_heX:{} \33[0m'.format( balance, price, factoryFixId, factoryFixId_hex)) return packages = self.device['washConfig'] package = max(packages.values(), key = lambda x: x["coins"]) max_coin = int(package.get("coins")) time_value = int(package.get("time", 15)) # 下多少个脉冲 each_coin = int(self.device['otherConf'].get('eachCoin', 1)) payload = {'balance': RMB.yuan_to_fen(balance), 'price': price * 100, 'funCode': '11', 'cardNo': factoryFixId_hex, 'max_coin': int(max_coin), "driver_type": "aux_uart", "time": time_value,'each_coin':each_coin} serviceCache.set('yuchuanyikatong_%s_%s' % (self.device.logicalCode, factoryFixId), payload, 10) MessageSender.send_no_wait(self.device, DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, payload) logger.info("\33[32m caiyi_payload:%s\33[0m" % json.dumps(payload, ensure_ascii = False, encoding = 'utf-8')) class MyIDStartAckEvent(AckEvent): def get_or_create_order(self, order_id, card_no, rmb, campus_user_num): packages = self.device['washConfig'] package = packages.get(str(int(rmb)), {}) order = ConsumeRecord.objects(startKey = str(order_id)).first() if not order: attach_paras = { 'card_dealer': 'YuChuanCard', 'coins': str(VirtualCoin(0).mongo_amount), 'card_no': card_no, 'campus_user_num': campus_user_num, 'isCleanOrder': False} order = ConsumeRecord.new_one( order_no = ConsumeRecord.make_no(card_no, UserConsumeSubType.CARD), user = MyUser(openId = card_no, nickname = ''), device = self.device, group = Group.get_group(self.device.groupId), package = package, # TODO: 补充刷卡单价配置 attach_paras = attach_paras, pay_info = { 'via': 'card', 'coins': str(VirtualCoin(0).mongo_amount), }, start_key = str(order_id)) order.coin = rmb order.remarks = 'YuChuanCard' order.save() return order def do_impl(self, **args): yc = YuChuanApi(**config_dic) order_id = self.event_data['order_id'] card_no_hex = str(self.event_data['cardNo']) card_no = str(int(reverse_hex(card_no_hex), 16)) devNo = str(self.event_data.get('IMEI')) logger.info('\33[32m caiyi_received data:{},cardNo:{} \33[0m'.format(json.dumps(self.event_data, ), card_no)) # 测试白名单 test_card = serviceCache.get('yc-card-{}'.format(card_no_hex)) or TempValues.get('yc-card-{}'.format(card_no_hex)) or TempValues.get('yc-all-free') if test_card: card_no = '757996699' logger.debug('caiyi_yc_test_card,to pay!!!') status = self.event_data['status'] duration = self.event_data.get('duration', 0) # 本次的使用时间,单位秒 只有状态为running的时候才会出现 rmb = RMB.fen_to_yuan(self.event_data['fee']) # 扣除的费用, 分为单位,转换为元 # 查询该卡片 card = Card.objects.filter(cardNo = card_no, cardType = 'YuChuanCard').first() if not card: # 先查询余额的时候已经做过卡片核实,如果查不到该卡片,说明有问题 logger.info('\33[32m caiyi_The server did not find the user cardNo:{},cardNo_hex:{}\33[0m'.format(card_no, card_no_hex)) raise ServiceException({'result': 2, 'description': u"没有找到该卡片"}) userNumb = card.attachParas.get('campus_user_num') # 同一卡号20分钟以内的running单直接返回不处理 last_order = serviceCache.get('yc-dev<{}>-card<{}>'.format(self.device.devNo, card_no_hex)) if last_order: logger.info('\33[32m Repeat orders within 20 minutes \33[0m') return order = self.get_or_create_order(order_id, card_no, rmb, userNumb) # type: ConsumeRecord order.servicedInfo = { 'CODE': '{}-{}'.format(card.cardName, userNumb), 'cardNo': card_no } # 卡片记录最后一次刷卡的设备 card.devNo = devNo card.save() # 白名单订单处理 if test_card: order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)}) order.attachParas.update({'coins': str(rmb)}) order.status = self.event_data['status'] order.isNormal = True order.save() logger.info( "\33[32m caiyi_order is finished, cardNo:%s,cardNo_hex:%s,coin:%s,orderNo:%s,\33[0m" % ( card_no, card_no_hex, order.coin, order.orderNo,)) return if status == 'running' or status == 'finished': if order.status == 'created': factoryFixId = str(card_no) pay_order_model = { "orderSerial": str(order.orderNo), "userNumb": userNumb, "factoryFixId": factoryFixId, "consumeValue": str(order.coin), } n = 0 unpay_ordet_list = card.attachParas.get('unpay_order') while n < 5: res = yc.pay_order(**pay_order_model) if res['success'] == True: order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)}) order.attachParas.update({'coins': str(rmb)}) order.status = self.event_data['status'] order.isNormal = True order.save() logger.info("\33[32m caiyi_order is finished, coin:%s,orderNo:%s,msg:%s\33[0m" % ( order.coin, order.orderNo, json.dumps(res, encoding = 'utf-8', ensure_ascii = False) )) break if res['success'] == False: # 保存不在未支付的订单号 if str(order.id) not in unpay_ordet_list: unpay_ordet_list.append(str(order.id)) card.attachParas['unpay_order'] = unpay_ordet_list card.save() logger.info( "caiyi_pay order fail! devNo:%s,orderNo:%s,coin:%s,msg:%s" % (order.devNo, order.orderNo, order.coin, json.dumps(res, encoding = 'utf-8', ensure_ascii = False))) break time.sleep(1) n += 1 if n == 5: unpay_ordet_list.append(str(order.id)) card.attachParas['unpay_order'] = unpay_ordet_list card.save() logger.info( "\33[32m caiyi_pay order fail! cardNo:%s,orderNo:%s,coin:%s\33[0m" % (factoryFixId, order.coin, order.orderNo)) elif order.status == 'running' or order.status == 'finished': # 如果该笔订单显示不正常(默认值为False) if order.isNormal is False: pay_order_model = { "orderSerial": str(order.orderNo), "userNumb": userNumb, "factoryFixId": str(card_no), "consumeValue": str(order.coin), } res = yc.pay_order(**pay_order_model) if res['success'] != True: logger.info("\33[32m caiyi_pay order fail! cardNo:%s,orderNo:%s,coin:%s,msg:%s\33[0m" % ( card_no, order.coin, order.orderNo, json.dumps(res, encoding = 'utf-8', ensure_ascii = False))) return if res['success'] == True: order.paymentInfo.update({'coins': str(rmb), 'duration': str(int(duration) / 60)}) order.attachParas.update({'coins': str(rmb)}) order.status = self.event_data['status'] order.isNormal = True order.save() logger.info("\33[32m caiyi_order is finished,cardNo:%s,coin:%s,orderNo:%s,msg:%s\33[0m" % ( card_no, order.coin, order.orderNo, json.dumps(res, encoding = 'utf-8', ensure_ascii = False),)) # 记录该设备 20分钟内 再次有付款单 不处理 serviceCache.set('yc-dev<{}>-card<{}>'.format(self.device.devNo, card_no_hex), order.orderNo, 60 * 20) # 该笔订单已经扣费成功了,直接更改状态 else: if order.status != 'finished': order.status = status order.finishedTime = datetime.datetime.now() order.save() logger.info("\33[32m caiyi_order is finished,cardNo:%s,coin:%s,orderNo:%s\33[0m" % ( card_no, order.coin, order.orderNo,)) serviceCache.delete('yuchuanyikatong_%s' % (card_no)) # 删除payload class MyNetPayAckEvent(AckEvent): def deal_running_event(self, order): # type: (ConsumeRecord)->None def do_running_order(order, result): # type: (ConsumeRecord, dict)->None if order.status in ['running', 'finished']: logger.debug('order<{}> no need to ack. this has done.'.format(repr(order))) return if order.status == 'timeout': freeze_user_balance(self.device, Group.get_group(order.groupId), order) err_desc = u'事件上报成功,补充扣款' else: err_desc = '' order.isNormal = True order.status = 'running' order.startTime = datetime.datetime.fromtimestamp(result['sts']) order.save() do_running_order(order, self.event_data) start_time = Arrow.fromdatetime(order.startTime, tzinfo = settings.TIME_ZONE) cache_info = { 'startTime': start_time.format('YYYY-MM-DD HH:mm:ss'), 'status': Const.DEV_WORK_STATUS_WORKING, 'openId': order.openId, 'orderNo': order.orderNo, 'coins': order.coin, 'money': order.money, 'estimatedTs': int(start_time.timestamp + order.my_package.estimated_duraion), 'unit': order.my_package.unit, 'needKind': order.my_package.need_kind, 'needValue': order.my_package.need_value } Device.update_dev_control_cache(self.device.devNo, cache_info) ServiceProgress.new_progress_for_order(order = order, device = self.device, cache_info = cache_info) def deal_finished_event(self, order): # type: (ConsumeRecord)->None def do_finished_order(order, result): if order.status == 'finished': logger.debug('order<{}> no need to ack. this has done.'.format(repr(order))) return if order.status == 'running': order.status = 'finished' order.finishedTime = datetime.datetime.fromtimestamp(result['fts']) order.save() else: if order.status == 'timeout': freeze_user_balance(self.device, Group.get_group(order.groupId), order) err_desc = u'事件上报成功,补充扣款' else: err_desc = '' order.isNormal = True order.status = 'finished' order.errorDesc = err_desc order.startTime = datetime.datetime.fromtimestamp(result['sts']) order.finishedTime = datetime.datetime.fromtimestamp(result['fts']) order.save() clear_frozen_user_balance(self.device, order, round(self.event_data['duration'] / 60.0, 2), 0, RMB(0)) if order.status == 'finished': logger.debug('order<{}> has finished.'.format(repr(order))) return do_finished_order(order, self.event_data) current_cache_info = Device.get_dev_control_cache(self.device.devNo) if current_cache_info and order.orderNo == current_cache_info.get('orderNo', None): Device.invalid_device_control_cache(self.device.devNo) ServiceProgress.objects(open_id = order.openId, device_imei = self.device.devNo, port = int(order.used_port), consumeOrder__orderNo = order.orderNo).update_one( upsert = False, **{ 'isFinished': True, 'finished_time': Arrow.fromdatetime(order.finishedTime, tzinfo = settings.TIME_ZONE).timestamp, 'expireAt': datetime.datetime.now() }) def do_impl(self, **args): order_id = self.event_data['order_id'] order = ConsumeRecord.objects(ownerId = self.device.ownerId, orderNo = order_id).first() # type: ConsumeRecord if not order: logger.debug('order is not exist.'.format(self.event_data['order_id'])) return if order.status == 'finished': logger.debug('order<{}> has been fished.'.format(repr(order))) return if self.event_data['rst'] == ErrorCode.DEVICE_CONN_FAIL: logger.error('order<{}> timeout.'.format(repr(order))) if order.status == 'created': order.isNormal = False order.status = 'timeout' order.errorDesc = DeviceErrorCodeDesc.get(ErrorCode.DEVICE_CONN_FAIL) set_start_key_status(start_key = order.startKey, state = START_DEVICE_STATUS.TIMEOUT, reason = cn(DeviceErrorCodeDesc.get(ErrorCode.DEVICE_CONN_FAIL))) return if self.event_data['rst'] != ErrorCode.DEVICE_SUCCESS: logger.error('order<{}> failure.'.format(repr(order))) if 'errorDesc' in self.event_data: error_desc = self.event_data['errorDesc'] else: error_desc = DeviceErrorCodeDesc.get(self.event_data['rst']) order.status = 'finished' order.isNormal = False order.finishedTime = datetime.datetime.fromtimestamp(self.event_data['fts']) order.errorDesc = error_desc order.save() set_start_key_status(start_key = order.startKey, state = START_DEVICE_STATUS.FAILURE, reason = cn(error_desc)) return set_start_key_status(start_key = order.startKey, state = START_DEVICE_STATUS.FINISHED, order_id = str(order.id)) if self.event_data['status'] == 'running': return self.deal_running_event(order) if self.event_data['status'] == 'finished': return self.deal_finished_event(order)