# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import json import logging import time from decimal import Decimal import re from typing import TYPE_CHECKING from apilib.monetary import VirtualCoin from apilib.utils_datetime import to_datetime from apps.web.common.proxy import ClientConsumeModelProxy from apps.web.constant import Const, DeviceCmdCode from apps.web.core.adapter.base import SmartBox from apps.web.core.exceptions import ServiceException from apps.web.core.networking import MessageSender from apps.web.device.models import Device, Group from apps.web.user.models import ConsumeRecord, MyUser from apps.web.user.models import ServiceProgress from taskmanager.mediator import task_caller logger = logging.getLogger(__name__) if TYPE_CHECKING: pass class GeZiGuiBox(SmartBox): def __init__(self, device): super(GeZiGuiBox, self).__init__(device) # 解析获取设备信息的返回报文 def analyze_event_data(self, data): pass def _check_package(self, port, package): """ 获取设备启动的发送数据 根据设备的当前模式以及套餐获取 :param package: :return: """ sizeInfo = [{"port": str(_), "size": "small"} for _ in xrange(1, 25)] sizeInfo = self.device.get("otherConf", dict()).get("sizeInfo") or sizeInfo res = filter(lambda x: x["port"] == port, sizeInfo) result = False if not res: raise ServiceException({"result": 2, "description": u"未找到{}号该柜门".format(port)}) name = package.get("name") size = res[0].get("size") if size == "large" and u"大" in name: result = True elif size == "medium" and u"中" in name: result = True elif size == "small" and u"小" in name: result = True else: pass if result == False: raise ServiceException({"result": 2, "description": u"未找到{}号该柜门".format(port)}) def log_obj(self, obj): if isinstance(obj, dict): for k, v in obj.items(): if isinstance(v, object): obj[k] = str(v) if isinstance(obj, list) or isinstance(obj, tuple) or isinstance(obj, set): obj = map(lambda x: str(x) if isinstance(x, object) else x, obj) if isinstance(obj, unicode): obj = str(obj) # print("\33[33m" + json.dumps(obj,ensure_ascii=True,encoding="utf-8") + "\33[0m") return "\33[33m" + json.dumps(obj, ensure_ascii=False, encoding="utf-8") + "\33[0m" def disable_app_device(self, switch=True): # type:(bool) -> None otherConf = self.device.get("otherConf", {}) otherConf["disableDevice"] = switch Device.objects.filter(devNo=self.device["devNo"]).update(otherConf=otherConf) Device.invalid_device_cache(self.device["devNo"]) @staticmethod def encode_str(data, length=2, ratio=1.0, base=16): # type:(str,int,float,int) -> str if not isinstance(data, Decimal): data = Decimal(data) if not isinstance(length, str): length = str(length) if not isinstance(ratio, Decimal): ratio = Decimal(ratio) end = "X" if base == 16 else "d" encodeStr = "%." + length + end encodeStr = encodeStr % (data * ratio) return encodeStr @staticmethod def decode_str(data, ratio=1.0, base=16): # type:(str,float,int) -> str """ ratio:比率单位转换 """ if not isinstance(data, str): data = str(data) return "%.10g" % (int(data, base) * ratio) @staticmethod def reverse_hex(data): # type:(str) -> str if not isinstance(data, str): raise TypeError return "".join(list(reversed(re.findall(r".{2}", data)))) def decode_long_hex_to_list(self, data, split=2, ratio=1.0, base=16, decode=True): # type:(str,int,float,int) -> list """ return: list """ if len(data) % split != 0: raise Exception("Invalid data") pattern = r".{%s}" % split hex_list = re.findall(pattern, data) if decode: hex_list = map(lambda x: self.decode_str(x, ratio=ratio, base=base), hex_list) return hex_list else: return hex_list @staticmethod def check_params_range(params, minData=None, maxData=None, desc=""): # type:(str,float,float,str) -> str """ 检查参数,返回字符串参数 """ if params is None: raise ServiceException({"result": 2, "description": u"参数错误."}) if not isinstance(params, Decimal): params = Decimal(params) if not minData and maxData: if not isinstance(maxData, Decimal): maxData = Decimal(maxData) if params <= maxData: return "%g" % params else: raise ServiceException({"result": 2, "description": u"%s超出可选范围,可选最大值为%g" % (desc, maxData)}) if not maxData and minData: if not isinstance(minData, Decimal): minData = Decimal(minData) if minData <= params: return "%g" % params else: raise ServiceException({"result": 2, "description": u"%s超出可选范围,可选最小值为%g" % (desc, minData)}) if not minData and not maxData: return "%g" % params else: if not isinstance(minData, Decimal): minData = Decimal(minData) if not isinstance(maxData, Decimal): maxData = Decimal(maxData) if minData <= params <= maxData: return "%g" % params else: raise ServiceException( {"result": 2, "description": u"%s参数超出可选范围,可取范围为%g-%g" % (desc, minData, maxData)}) def send_mqtt(self, funCode, data, cmd=DeviceCmdCode.OPERATE_DEV_SYNC): """ 发送mqtt 指令210 返回data """ if not isinstance(funCode, str): funCode = str(funCode) if not isinstance(data, str): data = str(data) result = MessageSender.send(self.device, cmd, {"IMEI": self.device["devNo"], "funCode": funCode, "data": data}, timeout=10) if "rst" in result and result["rst"] != 0: if result["rst"] == -1: raise ServiceException( {"result": 2, "description": u"该设备正在玩命找网络,请您稍候再试", "rst": -1}) elif result["rst"] == 1: raise ServiceException( {"result": 2, "description": u"该设备忙,无响应,请您稍候再试。也可能是您的设备版本过低,暂时不支持此功能", "rst": 1}) else: if cmd in [DeviceCmdCode.OPERATE_DEV_NO_RESPONSE, DeviceCmdCode.PASSTHROUGH_OPERATE_DEV_NO_RESPONSE]: return else: return result def do_update_configs(self, updateDict): dev = Device.objects.get(devNo=self.device.devNo) deviceConfigs = dev.otherConf.get("deviceConfigs", {}) deviceConfigs.update(updateDict) dev.otherConf['deviceConfigs'] = deviceConfigs dev.save() Device.invalid_device_cache(self.device.devNo) def start_device(self, package, openId, attachParas): portStr = attachParas.get("chargeIndex") if portStr is None: raise ServiceException({"result": 2, "description": u"未知端口"}) self._check_package(portStr, package) ctrInfo = Device.get_dev_control_cache(self._device["devNo"]) lineInfo = ctrInfo.get(portStr) if not lineInfo or lineInfo.get("status", 2) == Const.DEV_WORK_STATUS_IDLE: orderNo = attachParas.get("orderNo") delay = 1 * 60 data = self.encode_str(portStr) data += "FFFF" data += self.encode_str(delay) # 发送指令 result = self.send_mqtt(funCode="02", data=data) user = MyUser.objects.filter(openId=openId, groupId=self.device["groupId"]).first() portDict = { "status": Const.DEV_WORK_STATUS_WORKING, "isStart": True, "openId": openId, "orderNo": orderNo, "startTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "package": package, "devNo": self.device.devNo, "nickName": user.nickname, 'consumeRecordId': str(attachParas['consumeRecordId']) } Device.update_dev_control_cache(self._device["devNo"], {str(portStr): portDict}) otherConf = self.device.get("otherConf") or dict() result["consumeOrderNo"] = orderNo result["servicedInfo"] = {"chargeType": otherConf.get("chargeType", "time")} result["finishedTime"] = int(time.time()) + 7 * 24 * 60 * 60 return result def get_default_port_nums(self): sizeInfo = self.device["otherConf"].get("sizeInfo", []) return len(sizeInfo) or 24 def check_dev_status(self, attachParas=None): """ 控制缓存里面查找有没有使用该端口使用情况 没有则返回 有则返回raise ServiceException({"result": 2, "description": u"当前洗衣机%s,请稍候使用" % info["statusInfo"]}) attachParas:{"category": "chargeIndex", "chargeIndex": "2", "voltage": None, "batteryType": None, "waterIndex": None, "powerGear": None} """ logger.info(self.log_obj("GZG_ do_____check_dev_status,attachParas: {}".format(attachParas))) chargeIndex = attachParas.get("chargeIndex") if not chargeIndex: raise ServiceException({"result": 2, "description": u"当前柜没法使用"}) ctrInfo = Device.get_dev_control_cache(self.device.devNo) lineInfo = ctrInfo.get(chargeIndex) if not lineInfo: logger.debug("get null control cache from {}".format(repr(self.device))) return elif lineInfo.get("status", 0) == Const.DEV_WORK_STATUS_IDLE: return elif lineInfo.get("status", 0) == Const.DEV_WORK_STATUS_WORKING: # 第二次 扫码 点击了 startAction 准备拉起支付 if 'consumeRecordId' in lineInfo: order = ClientConsumeModelProxy.get_one( shard_filter = {'ownerId': self.device.ownerId}, id = lineInfo.get("consumeRecordId")) # type: ConsumeRecord else: # 升级兼容deprecated order = ConsumeRecord.objects.filter(orderNo = lineInfo.get("orderNo")).first() # type: ConsumeRecord if order: # 防止用户点一次之后取消又来点 做一个订单状态的修正 if order.is_waitPay(): order.update(status="end") else: raise ServiceException({"result": 2, "description": u"当前%s号柜,已有人使用稍候使用" % chargeIndex}) else: raise ServiceException({"result": 2, "description": u"当前%s号柜,已有人使用稍候使用" % chargeIndex}) def check_order_state(self, openId): """ 通过 openId 以及设备来鉴别 订单 :param openId: :return: """ dealerId = self.device.ownerId devTypeCode = self.device.devType.get("code") # TODO: 性能问题 orders = ConsumeRecord.objects.filter(ownerId=dealerId, openId=openId, devTypeCode=devTypeCode, status__ne="finished", isNormal=True).order_by('-id') if not orders: return else: orders.update(status="running") return orders[0] def deal_order_money(self, order): # type: (ConsumeRecord) -> ConsumeRecord """ 传入order对象,给订单动态计算时间,使用情况,费用 主要做三个处理. 1 计算时间 价格 2 更新ConsumeRecord 3 更新ServiceProgress """ order = self.update_order_info(order) money = order.money # type: VirtualCoin port = order.attachParas.get("chargeIndex") spQueryDict = { "open_id": order.openId, "device_imei": order.devNo, "port": int(port), "isFinished": False } sp = ServiceProgress.objects.filter(**spQueryDict).first() sp.consumeOrder.update({"needPayMoney": str(money)}) sp.save() return order def update_order_info(self, order): """ 获取用户当前需要扣除的金额 """ if order.status == "finished": return order ctrInfo = Device.get_dev_control_cache(order.devNo) port = order.attachParas.get("chargeIndex") lineInfo = ctrInfo.get(port) startTime = lineInfo.get("startTime") startTime = to_datetime(startTime) nowTime = datetime.datetime.now() if order.finishedTime: old_finish_time = order.finishedTime pay_order_time = round(((nowTime - old_finish_time).total_seconds() / 60.0)) if pay_order_time > 3: # 如果付款时间大于3分钟,时间继续,订单费用刷新,否则订单金额不变 usedTime = int(round(((nowTime - startTime).total_seconds() / 60.0))) finishedTime = nowTime else: usedTime = order.servicedInfo.get("duration", 0) finishedTime = order.finishedTime else: usedTime = int(round(((nowTime - startTime).total_seconds() / 60.0))) finishedTime = nowTime packageId = order.attachParas.get("packageId") washConfig = self.device.get("washConfig", dict()) # new package = washConfig.get(packageId) base_price = package.get("basePrice", 0) timePrice = package.get("price", 0) money = VirtualCoin(base_price) + VirtualCoin(float(timePrice) * float(usedTime) / 60.0) # money = VirtualCoin(0.01) consumeDict = { "port": order.attachParas.get("chargeIndex"), "duration": usedTime, # "finishedTime": finishedTime.strftime("%Y-%m-%d %H:%M:%S"), "spendMoney": str(money), } order.servicedInfo = consumeDict order.finishedTime = finishedTime order.money = money order.coin = money if usedTime < self.device["otherConf"].get("refundProtectionTime", 0): order.money = VirtualCoin(0) order.coin = VirtualCoin(0) order.save() order.reload() return order def finished_order_open_door(self, port): """ 发送开门指令后 延时上报如果门没关 推送给经销商 """ portHex = self.encode_str(port, 2) delay = self.device.get("otherConf", {}).get("delay_check_close_door", 3) delayHex = self.encode_str(delay, length=4) data = portHex + "FFFF" + delayHex self.send_mqtt(funCode="02", data=data) def get_port_status(self, force=False): if force: pass devCache = Device.get_dev_control_cache(self._device["devNo"]) allPorts = self.get_default_port_nums() if allPorts is None: raise ServiceException({"result": 2, "description": u"充电端口信息获取失败"}) # 获取显示端口的数量 客户要求可以设置 显示给用户多少个端口 showPortNum = self._device.get("otherConf", {}).get("actualPortNum", 24) statusDict = dict() showStatusDict = dict() for portNum in xrange(allPorts): portStr = str(portNum + 1) tempDict = devCache.get(portStr, {}) if "status" in tempDict: statusDict[portStr] = {"status": tempDict["status"]} elif "isStart" in tempDict: if tempDict["isStart"]: statusDict[portStr] = {"status": Const.DEV_WORK_STATUS_WORKING} else: statusDict[portStr] = {"status": Const.DEV_WORK_STATUS_IDLE} else: statusDict[portStr] = {"status": Const.DEV_WORK_STATUS_IDLE} if int(portStr) <= int(showPortNum): showStatusDict[portStr] = {"status": statusDict[portStr]["status"]} allPorts, usedPorts, usePorts = self.get_port_static_info(statusDict) portsDict = {"allPorts": allPorts, "usedPorts": usedPorts, "usePorts": usePorts} Device.update_dev_control_cache(self._device["devNo"], portsDict) return showStatusDict def get_port_info(self, port): # type:(str) -> dict """ 获取单个端口状态 funcCode : 发送: 返回: """ ctrInfo = Device.get_dev_control_cache(self.device.devNo) return ctrInfo.get(port) def get_port_status_from_dev(self): """ funCode : CC """ portList = [] allnums = self.get_default_port_nums() ctrInfo = Device.get_dev_control_cache(self.device.devNo) for key in xrange(1,allnums+1): lineInfo = ctrInfo.get(str(key)) if not lineInfo: item = { "index": str(key), "status": "idle", } elif "isStart" in lineInfo or lineInfo.get("status") == Const.DEV_WORK_STATUS_WORKING: item = { "index": str(key), "status": "busy", "nickName": lineInfo.get("nickName"), "startTime": lineInfo.get("startTime"), } else: item = { "index": str(key), "status": "idle", } portList.append(item) return portList def _notify_user_service_over(self, managerialOpenId, port, usedTime, money, type=None): group = Group.get_group(self.device.get("groupId")) adminTel = self.device["otherConf"].get("deviceConfigs", {}).get("adminTel") serviceText = u"温馨提示,本次寄存服务已完成! \\n请带走您的行李物品,关好箱门\\n祝您生活愉快!!!" if type == "error": serviceText = u"检测到当前柜门无法正常开启\\n请联系管理员:{}".format(adminTel) if type == "dealer": serviceText = u"检测到当前柜门无法正常开启\\n由管理员远程开门,联系管理员:{}".format(adminTel) notifyData = { "title": u"\\n\\n设备编号:\\t\\t{logicalCode}-{port}号柜门\\n\\n服务地址:\\t\\t{group}\\n\\n使用时长:\\t\\t{chargeTime}分钟\\n\\n本单消费:\\t\\t{money}".format( logicalCode=self._device["logicalCode"], port=port, group=group["address"], chargeTime=usedTime, money=str(money), ), "service": serviceText, "finishTime": datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "remark": u"谢谢您的支持" } task_caller( func_name="report_to_user_via_wechat", openId=managerialOpenId, dealerId=self.device.get("ownerId"), templateName="service_complete", **notifyData ) @property def isHaveStopEvent(self): return False def isHaveCallback(self): return True def do_callback(self, order): # type: (ConsumeRecord) -> None consumeDict = order.servicedInfo port = order.attachParas.get("chargeIndex") logger.info(self.log_obj( "receive finished callback,order={}, devNo={} openId={}, port={}, money={}".format(order.orderNo, order.devNo, order.openId, port, order.money))) openId = order.openId user = MyUser.objects.filter(openId=openId, groupId=self.device.get("groupId")).first() usedTime = consumeDict.get("duration", 0) # 重连四次 try: self.finished_order_open_door(port) except Exception: self._notify_user_service_over(user.managerialOpenId, port, usedTime, order.money, type="error") order.update(errorDesc=u"开门失败,请联系管理员") logger.info("do finished callback is fail") return ServiceProgress.update_progress_and_consume_rcd( self._device["ownerId"], { "open_id": order.openId, "device_imei": self.device["devNo"], "port": int(port), "isFinished": False }, consumeDict ) self._notify_user_service_over(user.managerialOpenId, port, usedTime, order.money) Device.clear_port_control_cache(self.device.devNo, port) logger.info(self.log_obj("callback finished callback is over!!")) def test(self, port="1"): # type:(str) -> None """ 打开某个柜门 funcCode : 02 """ portHex = self.encode_str(port, 2) data = portHex + "FFFF" self.send_mqtt(funCode="02", data=data) def set_device_function(self, request, lastSetConf): print request if 'initDev' in request.POST: self._do_init_dev() def set_device_function_param(self, requestBody, lastSetConf): locakDelay = requestBody.POST.get("lockDelay") adminPhone = requestBody.POST.get("adminTel") refundProtectionTime = requestBody.POST.get("refundProtectionTime") configs = lastSetConf or dict() if locakDelay: configs.update({"lockDelay": locakDelay}) if adminPhone: configs.update({"adminTel": adminPhone}) if refundProtectionTime: configs.update({"refundProtectionTime": refundProtectionTime}) self.do_update_configs(configs) def get_dev_setting(self): deviceConfigs = self.device["otherConf"].get("deviceConfigs", {}) adminTel = deviceConfigs.get('adminTel', '') lockDelay = deviceConfigs.get('lockDelay', 0) return {'adminTel':adminTel, 'lockDelay':lockDelay} def unlockPort(self, port): """ 经销商开门 如果里面有东西,订单信息里面需要记录是由经销商开门 """ ctrInfo = Device.get_dev_control_cache(self.device.devNo) lineInfo = ctrInfo.get(port) # TODO 经销商面板端口开启未使用柜门 if not lineInfo or lineInfo.get("status", 2) == Const.DEV_WORK_STATUS_IDLE: portHex = self.encode_str(port) data = portHex + "FFFF" self.send_mqtt(funCode="02", data=data) # 用户联系经销商开门 else: openId = lineInfo.get("openId") orderNo = lineInfo.get("orderNo") now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S") logger.info(self.log_obj( "Door opened by dealer user_openId={},dealerId={},orderNo={},openedTime={}".format(openId, self.device[ "ownerId"], orderNo, now)) ) portHex = self.encode_str(port) data = portHex + "FFFF" try: self.send_mqtt(funCode = "02", data = data) except ServiceException as e: logger.info(self.log_obj("open door is fail")) raise ServiceException( {"result": 2, "description": e.result.get("description"), "rst": -1}) order = ConsumeRecord.objects.get(orderNo=orderNo) if order.status != "finished": order.isNormal = False order.desc = "该笔订单异常故障未付款,由经销商开门" order.coin = VirtualCoin(0) order.money = VirtualCoin(0) order.status = "finished" order.save() user = MyUser.objects.filter(openId=openId, groupId=self.device.get("groupId")).first() usedTime = order.servicedInfo.get("duration", 0) self._notify_user_service_over(user.managerialOpenId, port, usedTime, order.money, type="dealer") spQueryDict = { "open_id": openId, "device_imei": self.device["devNo"], "port": int(port), "isFinished": False } sp = ServiceProgress.objects.filter(**spQueryDict).first() if sp: logger.info(self.log_obj("now finished service progress,id={}".format(str(sp.id)))) sp.isFinished = True sp.consumeOrder.update({"adminOpenedTime": now}) sp.save() Device.clear_port_control_cache(self.device.devNo, port) def _do_init_dev(self): pass