# -*- coding: utf-8 -*- # !/usr/bin/env python import threading import os,sys,time PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..') sys.path.insert(0, PROJECT_ROOT) os.environ.update({"DJANGO_SETTINGS_MODULE": "configs.testing"}) import django django.setup() from apps.web.device.models import Device from apps.web.core.helpers import ActionDeviceBuilder from apps.web.constant import Const from apps.web.core.mqtt_client import MqttClient import threading import uuid,datetime from django.conf import settings import simplejson as json dev = Device.get_dev('865650042312401') box = ActionDeviceBuilder.create_action_device(dev) class DeviceTaskListoner(threading.Thread): def __init__(self, orderNos=[]): super(DeviceTaskListoner, self).__init__() self.orderNoDict = {} for orderNo in orderNos: self.orderNoDict[str(orderNo)] = [] def register_order_event(self,orderNo,funCode): value = {'funCode':funCode,'time':datetime.datetime.now().strftime(Const.DATETIME_FMT)} if not self.orderNoDict.has_key(orderNo): self.orderNoDict[str(orderNo)] = [value] else: self.orderNoDict[str(orderNo)].append(value) def check_order(self,orderNo,funCode): events = self.orderNoDict.get(str(orderNo),None) if events is None: return False for event in events: if event['funCode'] == funCode: return True return False def run(self): try: mqttc = MqttClient(client_id='webapp_' + str(uuid.uuid1())) mqttc.on_message = self.on_message mqttc.on_connect = self.on_connect # 连接到现网 mqttc.connect(settings.MQTT_HOSTNAME, settings.MQTT_PORT, 60) mqttc.loop_forever() finally: mqttc.disconnect() mqttc.close() def on_connect(self,client, userdata, flags, rc): # 订阅设备的120响应事件,用于检查 device_topic = 'server/865650042312401/100' client.subscribe(device_topic, qos = Const.MQTT_QOS) print('subscribe topic(%s) success' % device_topic) def on_message(self,mqttc, obj, msg): msgDict = json.loads(bytes.decode(msg.payload)) print('receive msg = %s' % msgDict) dev = Device.get_dev(msgDict['IMEI']) # type: DeviceDict if dev is None or (not dev.has_key('devType')): return if msgDict['data'].has_key('order'): self.register_order_event(msgDict['data']['order']['id'], msgDict['data']['fun_code']) print u'-----------------------测试查询所有端口状态' def test_main(): listoner = DeviceTaskListoner() listoner.setDaemon(True) listoner.start() #前置条件:0:未接负载;1:接负载;2:无继电器 portStatus = [1,0,0,0,0,0,0,0,0,0] #关闭所有端口,检查端口状态是否空闲 print u'关闭所有端口,检查端口状态是否空闲' for ii in range(10): box.stop_charging_port(ii+1) box.lock_unlock_port(ii+1,False) time.sleep(5) result = box.get_port_status_from_dev() for ii in range(10): if portStatus[ii] in [0,1] and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口状态错误' elif portStatus[ii] == 2 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口状态错误' # 分别启动每一路端口,检查端口是否正常 print u'分别启动每一路端口,检查端口是否正常' orderNos = [] for ii in range(10): devInfo = box.start_device({'time':3,'unit':u'分钟','coins':1},'aaaaaa',{'chargeIndex':str(ii + 1)}) orderNos.append(devInfo['consumeOrderNo']) time.sleep(120) result = box.get_port_status_from_dev() for ii in range(10): if portStatus[ii] == 0 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口状态错误' elif portStatus[ii] == 1 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_WORKING: print u'端口状态错误' elif portStatus[ii] == 2 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口状态错误' # 等待充电结束后,获取端口状态,检查端口是否正常 print u'等待充电结束后,获取端口状态,检查端口是否正常' time.sleep(180) result = box.get_port_status_from_dev() for ii in range(10): if portStatus[ii] in [0,1] and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口状态错误' elif portStatus[ii] == 2 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口状态错误' print u'检查订单的事件是否收齐了' for orderNo in orderNos: result = listoner.check_order(orderNo, 32) if not result: print u'没有收到创建成功的事件!,orderNo=%s' % orderNo result = listoner.check_order(orderNo, 34) if not result: print u'没有收到结束的事件!,orderNo=%s' % orderNo # 禁止端口,检查端口状态 print u'禁止端口,检查端口状态' for ii in range(10): box.lock_unlock_port(ii+1) time.sleep(5) result = box.get_port_status_from_dev() for ii in range(10): if result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_FORBIDDEN: print u'端口状态错误' # 解禁端口,检查端口状态 print u'解禁端口,检查端口状态' for ii in range(10): box.lock_unlock_port(ii+1,False) time.sleep(5) result = box.get_port_status_from_dev() for ii in range(10): if portStatus[ii] in [0,1] and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口状态错误' elif portStatus[ii] == 2 and result.get(str(ii+1))['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口状态错误' print u'------------------------查询端口详细信息' print u'关闭所有端口,检查端口信息' for ii in range(10): box.stop_charging_port(ii+1) time.sleep(5) for ii in range(10): result = box.get_port_info(ii+1) if portStatus[ii] in [0,1]: if result['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口信息错误' if result['power'] != 0: print u'端口功率信息错误' elif portStatus[ii] == 2: if result['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口信息错误' if result['power'] != 0: print u'端口功率信息错误' #分别启动每一路端口,检查端口是否正常 print u'分别启动每一路端口,检查端口是否正常' orderNoList = [] for ii in range(10): devInfo = box.start_device({'time':5,'unit':u'分钟','coins':1},'aaaaaa',{'chargeIndex':str(ii + 1)}) orderNoList.append(devInfo['consumeOrderNo']) time.sleep(1) for ii in range(10): result = box.get_port_info(ii+1) if portStatus[ii] in [1]: if result['status'] != Const.DEV_WORK_STATUS_WORKING or result['power'] <= 0 : print u'端口信息错误' #等待充电结束后,获取端口信息,检查端口是否正常 print u'等待充电结束后,获取端口信息,检查端口是否正常' time.sleep(280) for ii in range(10): result = box.get_port_info(ii+1) if portStatus[ii] in [0,1]: if result['status'] != Const.DEV_WORK_STATUS_IDLE or result['power'] > 0: print u'端口信息错误' else: if result['status'] != Const.DEV_WORK_STATUS_FAULT or result['power'] > 0: print u'端口信息错误' #禁止端口,检查端口信息 print u'禁止端口,检查端口信息' for ii in range(10): box.lock_unlock_port(ii+1) time.sleep(120) for ii in range(10): result = box.get_port_info(ii+1) if portStatus[ii] in [0,1]: if result['status'] != Const.DEV_WORK_STATUS_FORBIDDEN or result['power'] > 0: print u'端口信息错误' else: if result['status'] != Const.DEV_WORK_STATUS_FORBIDDEN or result['power'] > 0: print u'端口信息错误' for ii in range(10): box.lock_unlock_port(ii+1,False) print u'------------------------远程停止充电' print u'先启动端口,并每个端口付款多次,然后远程停止,再检查检查端口状态' orderNos = [] for ii in range(10): for jj in range(19): try: devInfo = box.start_device({'time':3,'unit':u'分钟','coins':1},'aaaaaa',{'chargeIndex':str(ii + 1)}) except Exception,e: continue orderNos.append(devInfo['consumeOrderNo']) for ii in range(10): box.stop_charging_port(ii+1) time.sleep(30) print u'检查订单的事件是否收齐了' for orderNo in orderNos: result = listoner.check_order(orderNo, 32) if not result: print u'没有收到创建成功的事件!,orderNo=%s' % orderNo result = listoner.check_order(orderNo, 34) if not result: print u'没有收到结束的事件!,orderNo=%s' % orderNo for ii in range(10): result = box.get_port_info(ii+1) if portStatus[ii] in [0,1]: if result['status'] != Const.DEV_WORK_STATUS_IDLE: print u'端口信息错误' elif portStatus[ii] == 2: if result['status'] != Const.DEV_WORK_STATUS_FAULT: print u'端口信息错误' print u'------------------------清除投币总数' print u'将投币统计清零,然后检查统计数据是否为0' box.clear_dev_feecount() result = box.get_dev_consume_count() if result['coinFee'] != 0: print u'清零统计数据不对' print u'每个端口注入多个订单,检查每个订单是否都逐步执行完毕' time.sleep(180) orderNos = [] for ii in range(10): for jj in range(3): devInfo = box.start_device({'time':1,'unit':u'分钟','coins':1},'aaaaaa',{'chargeIndex':str(ii + 1)}) orderNos.append(devInfo['consumeOrderNo']) time.sleep(240) print u'检查每个端口下排队的订单的事件是否收齐了' for orderNo in orderNos: result = listoner.check_order(orderNo, 32) if not result: print u'没有收到创建成功的事件!,orderNo=%s' % orderNo result = listoner.check_order(orderNo, 34) if not result: print u'没有收到结束的事件!,orderNo=%s' % orderNo while True: try: test_main() except Exception,e: print(u'错误 错误 错误exception =%s' % e) continue print 'finished'