123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import logging
- import time
- import simplejson as json
- from django.conf import settings
- from django.http import HttpResponse
- from pymongo.errors import DuplicateKeyError
- from typing import TYPE_CHECKING
- from apilib.utils_datetime import get_zero_time, get_tomorrow_zero_time, \
- to_datetime
- from apilib.utils_json import JsonResponse
- from apps.web.common.models import OperatorLog
- from apps.web.constant import Const, DeviceCmdCode
- from apps.web.core import ROLE
- from apps.web.core.helpers import ActionDeviceBuilder
- from apps.web.core.adapter.base import fill_2_hexByte
- from apps.web.core.exceptions import ServiceException, TestError
- from apps.web.exceptions import UserServerException
- from apps.web.core.networking import MessageSender
- from apps.web.core.utils import JsonErrorResponse, JsonOkResponse
- from apps.web.device.define import DeviceChannelType
- from apps.web.device.models import Device, CheckDevice
- from apps.web.device.models import DeviceDict
- from apps.web.helpers import get_wx_config
- from apps.web.services.bluetooth.service import ActionBtDeviceBuilder
- from apps.web.test.models import TestRecord, TestStats
- from apps.web.utils import trace_call, error_tolerate, tester_login, permission_required, \
- LimitAttemptsManager
- if TYPE_CHECKING:
- from django.core.handlers.wsgi import WSGIRequest
- from apps.web.test.models import Tester
- logger = logging.getLogger(__name__)
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'入库失败,请重试'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def bindCode(request):
- curr_user = request.user
- if not curr_user.has_privilage('bind'):
- return JsonResponse({'result': 0, 'description': u'您无权限绑定设备', 'payload': {}})
- logicalCode = str(request.GET.get('logicalCode'))
- imei = str(request.GET.get('imei'))
- logger.debug('logicalCode = %s; devNo = %s' % (logicalCode, imei))
- if not logicalCode:
- return JsonResponse({'result': 0, 'description': u'设备编号为空', 'payload': {}})
- if not imei:
- return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}})
- logicalCode = logicalCode.strip()
- if 'DATA_MATRIX' in imei:
- imei = imei.split(',')[1].split(';')[0].strip()
- elif 'MAC:' in imei:
- imei = imei.split(',')[0].split(':')[1].strip()
- elif ';' in imei:
- imei = imei.split(';')[0].strip()
- if len(imei) > 15:
- return JsonResponse({'result': 0, 'description': u'错误的IMEI: {}'.format(imei), 'payload': {}})
- dev = Device.get_collection().find_one(
- {'$or': [{'logicalCode': logicalCode}, {'devNo': imei, 'logicalCode': {'$nin': [None, '']}}]})
- if dev:
- return JsonResponse({'result': 0, 'description': u'已经绑定(L=%s, IMEI=%s)' % (logicalCode, imei), 'payload': {}})
- try:
- Device.bind(imei, logicalCode, curr_user)
- except DuplicateKeyError:
- return JsonResponse({
- 'result': 0,
- 'description': u'已经绑定(L=%s, IMEI=%s)' % (logicalCode, imei),
- 'payload': {}})
- except UserServerException as e:
- return JsonErrorResponse(description = e.message)
- return JsonResponse({'result': 1, 'description': u'绑定成功', 'payload': {}})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'入库失败,请重试'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def unbindCode(request):
- curr_user = request.user # type: Tester
- if not curr_user.has_privilage('unbind'):
- return JsonResponse({'result': 0, 'description': u'您没有权限解解绑定设备', 'payload': {}})
- imei = str(request.GET.get('imei'))
- logger.debug('devNo = %s' % imei)
- if not imei:
- return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}})
- if 'DATA_MATRIX' in imei:
- imei = imei.split(',')[1].split(';')[0].strip()
- elif 'MAC:' in imei:
- imei = imei.split(',')[0].split(':')[1].strip()
- try:
- dev = Device.get_dev(imei) # type: DeviceDict
- if not dev:
- return JsonResponse({'result': 0, 'description': u'该设备不存在', 'payload': {}})
- if curr_user.has_privilage('forceUnbind'):
- Device.unbind(dev = dev)
- else:
- Device.unbind(dev = dev, operator = curr_user)
- OperatorLog.log(user = request.user,
- level = OperatorLog.LogLevel.CRITICAL,
- operator_name = u'强制解除绑定',
- content = {
- 'logicalCode': dev.logicalCode,
- 'devNo': dev.devNo,
- 'iccid': dev.iccid,
- 'imsi': dev.imsi,
- 'simExpireDate': dev.get('simExpireDate', None),
- 'expireDate': dev.get('expireDate', None),
- 'location': dev.get('location'),
- 'softVer': dev.softVer,
- 'server': dev.server,
- 'coreVer': dev.coreVer
- })
- return JsonResponse({'result': 1, 'description': u'解绑成功', 'payload': {}})
- except UserServerException as e:
- return JsonResponse({'result': 0, 'description': e.message, 'payload': {}})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'解注册失败,请重试'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def unregisterCode(request):
- curr_user = request.user # type: Tester
- if not curr_user.has_privilage('unregister'):
- return JsonResponse({'result': 0, 'description': u'您没有权限解注册设备', 'payload': {}})
- imei = str(request.GET.get('imei'))
- logger.debug('devNo = %s' % imei)
- if not imei:
- return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}})
- try:
- device = Device.get_dev(imei) # type: DeviceDict
- if not device:
- return JsonResponse({'result': 0, 'description': u'该设备不存在', 'payload': {}})
- if not device.is_registered:
- return JsonResponse({'result': 0, 'description': u'该设备没有注册,不需要解绑', 'payload': {}})
- Device.un_register(device, force = True, operator = request.user.human_id)
- return JsonResponse({'result': 1, 'description': u'解注册成功', 'payload': {}})
- except Exception as e:
- return JsonResponse({'result': 0, 'description': e.message, 'payload': {}})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def accountInfo(request):
- # type: (WSGIRequest)->JsonResponse
- return JsonResponse({"result": 1, "description": None, 'payload': request.user.to_dict()})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- def login(request):
- # type: (WSGIRequest)->JsonResponse
- try:
- payload = json.loads(request.body)
- username, password = payload['username'], payload['password']
- return tester_login(request, logger, username, password)
- except Exception as e:
- logger.exception('super manager login error = %s' % e)
- return JsonResponse({'result': 0, 'description': u'登录异常', 'payload': {}})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def confirmTestResult(request):
- pass
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def getDeviceInfo(request):
- devNo = Device.get_dev_no_from_request(request)
- if devNo:
- dev = Device.get_dev(devNo) # type: DeviceDict
- if not dev:
- return JsonResponse({'result': 0, 'description': u'设备不存在,请首先绑定'})
- if not dev.get('devNo'):
- return JsonResponse({'result': 0, 'description': u'设备无对应IMEI绑定,请首先绑定'})
- device = Device.objects(devNo = devNo).first()
- if device is None:
- return JsonResponse({'result': 0, 'description': u'设备不存在数据库中,请首先绑定'})
- payload = {
- 'devNo': dev.devNo,
- 'logicalCode': dev.logicalCode,
- 'softVer': dev.get('softVer', ''),
- 'hwVer': dev.get('hwVer', ''),
- 'mf': dev.get('mf', ''),
- 'registered': dev.is_registered
- }
- if dev.channelType == DeviceChannelType.Channel_BT:
- return JsonResponse(
- {
- '_comment_': 1,
- 'result': 1,
- 'description': None,
- 'payload': payload
- })
- elif device.driverCode == Const.DEVICE_TYPE_CODE_CAR_NENGPAI:
- return JsonResponse(
- {
- '_comment_': 1,
- 'result': 1,
- 'description': None,
- 'payload': payload
- })
- else:
- device_info = MessageSender.send(device = dev,
- cmd = DeviceCmdCode.GET_DEVINFO,
- payload = {
- 'IMEI': dev['devNo']
- },
- timeout = 15,
- retry = 3)
- if device_info['rst'] == 0:
- payload.update({
- 'driverCode': device_info.get('driverCode', u'不支持该属性'),
- 'driverVersion': device_info.get('driverVersion', u'不支持该属性'),
- 'coreVer': device_info.get('core_ver', u'不支持该属性'),
- 'boardValid': device_info.get('board_valid', u'不支持该属性'),
- 'pw_cut_memory': device_info.get('pw_cut_memory', u'不支持该属性'),
- 'bindTime': device.dateTimeBinded.strftime(
- '%Y-%m-%d %H:%M:%S') if device.dateTimeBinded is not None else u'未知',
- 'testTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
- })
- return JsonResponse(
- {
- '_comment_': 1,
- 'result': 1,
- 'description': None,
- 'payload': payload
- })
- else:
- return JsonResponse({'result': 0, 'description': u'获取模块信息超时,请检查模块是否上线,必要时重启模块'})
- return JsonResponse({'result': 0, 'description': u'设备不存在'})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def getTestResultList(request):
- results = CheckDevice.objects()
- return JsonResponse(
- {
- 'result': 1,
- 'description': None,
- 'payload': {'total': 0, 'items': []}
- })
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def onPoints(request):
- """
- 远程上分, 代理商通过上分测试机器系统流程正确。
- :param request:
- :return:
- """
- logicalCode = str(request.POST.get('value', None))
- devNo = Device.get_devNo_by_logicalCode(logicalCode)
- logger.info('l=%s,devNo=%s' % (logicalCode, devNo))
- if not devNo:
- return JsonResponse({'result': 0, 'description': u'设备不存在'})
- score = int(request.POST.get('coins', '1'))
- intfType = request.POST.get('type', 'pulse')
- # 通知设备,付款
- dev = Device.get_dev(devNo) # type:DeviceDict
- if dev.is_registered:
- return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'})
-
- # 到设备上获取驱动编码信息
- if 'driverCode' in dev and dev['driverCode'] == Const.DEVICE_TYPE_CODE_CAR_NENGPAI:
- try:
- device_adapter = ActionDeviceBuilder.create_action_device(dev, typeCode = Const.DEVICE_TYPE_CODE_CAR_NENGPAI)
- result = device_adapter.test(coins = score)
- if result['rst'] == -1:
- return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'})
- elif result['rst'] == 1:
- return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'})
- elif result['rst'] != 0:
- if not result.has_key('desc'):
- return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'.format(result['rst'])})
- else:
- return JsonResponse({'result': 0, 'description': u'错误:({}),请联系平台客服'.format(result['desc'])})
- except ServiceException, e:
- return JsonResponse({'result': 0, 'description': e.result.get('description'), 'payload': {}})
-
- return JsonResponse({'result': 1, 'description': u'success'})
-
- devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, {'IMEI': dev['devNo']})
- if 'rst' in devInfo and devInfo['rst'] != 0:
- if devInfo['rst'] == -1:
- return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'})
- elif devInfo['rst'] == 1:
- return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'})
- else:
- return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'})
- if 'driverCode' in devInfo and intfType == 'uart':
- type_code = devInfo['driverCode']
- elif intfType == 'pulse':
- type_code = Const.DEVICE_TYPE_CODE_PULSE
- elif 'soft_ver' in devInfo:
- version = devInfo['soft_ver'].split('.')[0].upper()
- if version >= 'V7':
- type_code = Const.DEVICE_TYPE_CODE_CHANGING_WEIFULE
- if 'V7.1' in version:
- type_code = Const.DEVICE_TYPE_CODE_CHANGING_WEIFULE2
- else:
- return JsonResponse({'result': 0, 'description': u'不支持该设备类型的设备测试'})
- try:
- device_adapter = ActionDeviceBuilder.create_action_device(dev, typeCode = type_code)
- result = device_adapter.test(coins = score)
- if result['rst'] == -1:
- return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'})
- elif result['rst'] == 1:
- return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'})
- elif result['rst'] != 0:
- if not result.has_key('desc'):
- return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'.format(result['rst'])})
- else:
- return JsonResponse({'result': 0, 'description': u'错误:({}),请联系平台客服'.format(result['desc'])})
- except ServiceException, e:
- return JsonResponse({'result': 0, 'description': e.result.get('description'), 'payload': {}})
- return JsonResponse({'result': 1, 'description': u'success'})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def onBtPoints(request):
- payload = json.loads(request.body)
- logicalCode = payload.get('logicalCode')
- major = int(payload.get('major', 0x1))
- minor = int(payload.get('minor', 0x1))
- dev = Device.get_dev_by_logicalCode(logicalCode) # type: DeviceDict
- if not dev:
- return JsonResponse({'result': 0, 'description': u'设备不存在'})
- if dev.is_registered:
- return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'})
- dev['major'] = major
- dev['minor'] = minor
- actionBox = ActionBtDeviceBuilder.create(Const.BT_DEVICE_TYPE_CODE_MAP[int(payload.get('code', 1))], dev)
- return JsonResponse({'result': 1, 'description': '',
- 'payload': {
- 'command': actionBox.encode_cmd(
- cmdId = 1,
- **{
- 'attachParas':
- {
- 'package': {
- 'coins': int(payload.get('coins', 1))
- }
- },
- 'seqNo': int(time.time())})}})
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def labelDevice(request):
- """
- 标记坏的设备,便于我们记录错误信息
- :param request:
- :return:
- """
- payload = json.loads(request.body)
- logicalCode = payload.get('logicalCode', None)
- devNo = Device.get_devNo_by_logicalCode(logicalCode)
- label = payload.get('label', '')
- if not devNo:
- return JsonResponse({'result': 0, 'description': u'设备不存在'})
- try:
- CheckDevice.get_collection().update_one({'imei': devNo}, {'$set': {'label': label}},
- upsert = False)
- except Exception, e:
- logger.exception('[label_device]update checkdevice error = %s' % e)
- return JsonResponse({'result': 0, 'description': 'update checkdevice error=%s' % e})
- return JsonResponse({'result': 1, 'description': u'success'})
- @trace_call(logger = logger)
- @permission_required(ROLE.tester)
- def getDeviceLabel(request):
- """
- 标记坏的设备,便于我们记录错误信息
- :param request:
- :return:
- """
- logicalCode = request.POST.get('value', None)
- devNo = Device.get_devNo_by_logicalCode(logicalCode)
- if not devNo:
- return JsonResponse({'result': 0, 'description': u'设备不存在'})
- try:
- results = CheckDevice.get_collection().find({'imei': devNo})
- except Exception, e:
- logger.info('update check device error = %s' % e)
- return JsonResponse({'result': 0, 'description': 'update check device error=%s' % e})
- if len(results) == 0:
- return JsonResponse({'result': 0, 'description': u'没有找到设备的标记信息'})
- result = {}
- result['devNo'] = results[0]['devNo']
- result['logicalCode'] = results[0]['logicalCode']
- result['testTime'] = results[0]['testTime']
- result['testResult'] = results[0]['testResult']
- result['testResultDesc'] = results[0]['testResultDesc']
- result['bindTime'] = results[0]['bindTime']
- result['devNo'] = results[0]['devNo']
- result['label'] = results[0]['label']
- return JsonResponse({'result': 1, 'description': u'success', 'payload': result})
- @trace_call(logger = logger)
- def wxconfig(request):
- url = request.GET.get('href')
- if not url:
- return JsonResponse({'result': 0, 'description': u'参数错误', 'payload': {}})
- value = get_wx_config(None, url)
- return JsonOkResponse(payload = {'wxconfig': value})
- @permission_required(ROLE.tester)
- def setComDriver(request):
- driverName = request.POST.get("driverName")
- logicalCode = request.POST.get("logicalCode")
- dev = Device.get_dev_by_logicalCode(logicalCode)
- result = MessageSender.send(dev, DeviceCmdCode.SET_DEVINFO, {
- 'IMEI': dev['devNo'],
- 'driver_set': {
- 'driver_url': 'http://www.washpayer.com/uploaded/' + driverName + '.driver'
- }
- })
- if result['rst'] != 0:
- return JsonResponse({'result': 0, 'description': u'安装驱动失败', 'payload': {}})
- return JsonResponse({'result': 1, 'description': None, 'payload': {}})
- @permission_required(ROLE.tester)
- def getComDriver(request):
- logicalCode = request.GET.get("logicalCode")
- dev = Device.get_dev_by_logicalCode(logicalCode)
- devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, {
- 'IMEI': dev['devNo']
- })
- if devInfo['rst'] != 0:
- return JsonResponse({'result': 0, 'description': u'查询驱动失败', 'payload': {}})
- return JsonResponse({'result': 1, 'description': None, 'payload': {
- 'driverCode': devInfo['driverCode'], 'driverVersion': 'driverVersion'}})
- @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'}))
- @permission_required(ROLE.tester)
- def checkLogicalBindingByRange(request):
- # type: (WSGIRequest)->JsonResponse
- """
- 检测逻辑码在某一个范围是否都绑定了
- :param request:
- :return:
- """
- payload = json.loads(request.body)
- # todo 这个不用了,后续写在Validation类里面
- # try:
- # logicalCodeRange(payload)
- # except MultipleInvalid:
- # return JsonErrorResponse(description=u'编号只能输入数字')
- start = payload.get('start')
- end = payload.get('end')
- strStart = str(start)
- strEnd = str(end)
- if 'G' in strStart and 'G' in strEnd:
- title = 'G'
- start = strStart[1:]
- end = strEnd[1:]
- elif 'R' in strStart and 'R' in strEnd:
- title = 'R'
- start = strStart[1:]
- end = strEnd[1:]
- elif 'N' in strStart and 'N' in strEnd:
- title = 'N'
- start = strStart[1:]
- end = strEnd[1:]
- elif 'B' in strStart and 'B' in strEnd:
- title = 'B'
- start = strStart[1:]
- end = strEnd[1:]
- else:
- title = ''
- spec = [title + str(_) for _ in xrange(int(start), int(end) + 1)]
- match = [_['logicalCode'] for _ in Device.get_collection().find({'logicalCode': {'$in': spec}})]
- diff = set(spec) - set(match)
- unbind_count = len(diff)
- description = u'测试结果: 一共检查了%d条数据。\n' % (len(spec),)
- description += u'全部已经绑定' if not bool(unbind_count) else u'有未绑定逻辑码(%s)条,它们为 %s' % (unbind_count, ', '.join(diff))
- return JsonResponse({'result': 1, 'description': description, 'payload': {}})
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'更新错误'))
- @permission_required(ROLE.tester)
- def resetPassword(request):
- # type: (WSGIRequest)->JsonResponse
- tester = request.user
- oldPassword = request.POST.get('oldPassword', "")
- if not oldPassword:
- return JsonResponse({"result": 0, "description": u"请输入旧密码", 'payload': {}})
- limitManager = LimitAttemptsManager('resetPassword', oldPassword)
- if limitManager.is_exceeded_limit():
- return JsonResponse({'result': 0, 'description': u'超出输入错误次数限制,请明日再试', 'payload': {}})
- if tester.check_password(oldPassword):
- password = request.POST.get('password', "")
- if password == "":
- return JsonResponse({"result": 0, "description": u"请输入密码", 'payload': {}})
- try:
- tester.set_password(password)
- limitManager.clear()
- return JsonResponse({"result": 1, "description": None, 'payload': {}})
- except Exception, e:
- logger.exception('update Dealer password some error=%s' % e)
- limitManager.incr()
- return JsonResponse({
- 'result': 0,
- 'description': u'旧密码输入错误,您还可输入%s次' % limitManager.times_left(),
- 'payload': {}
- })
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取特性错误'))
- @permission_required(ROLE.tester)
- def getFeatureList(request):
- features = request.user.features
- resultDict = {}
- for f in features:
- resultDict[f] = True
- return JsonOkResponse(payload = resultDict)
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'测试错误'))
- @permission_required(ROLE.tester)
- def dlxTest(request):
- payload = json.loads(request.body)
- ports = payload['ports']
- logicalCode = payload['logicalCode']
- # 首先检查驱动是否正确
- dev = Device.get_dev_by_l(logicalCode)
- if dev.get('ownerId'):
- return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'})
- try:
- devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, {'IMEI': dev['devNo']})
- except Exception, e:
- return JsonResponse({'result': 1, 'description': u'检测失败,网络通讯失败。'})
- if devInfo.has_key('rst') and devInfo['rst'] != 0:
- if devInfo['rst'] == -1:
- return JsonResponse({'result': 0, 'description': u'检测失败,设备网络通讯超时。'})
- elif devInfo['rst'] == 1:
- return JsonResponse({'result': 0, 'description': u'检测失败,模块之间的串口通讯超时, 设备的串口接线可能有问题。'})
- if devInfo['driverCode'] != Const.DEVICE_TYPE_CODE_CHARGING_DIANCHUAN:
- return JsonResponse({'result': 0, 'description': u'检测失败,设备驱动错误。'})
- for port in ports:
- try:
- hexPort = fill_2_hexByte(hex(int(port)), 2)
- hexTime = fill_2_hexByte(hex(1))
- devInfo = MessageSender.send(dev, DeviceCmdCode.OPERATE_DEV_SYNC,
- {'IMEI': dev['devNo'], "funCode": '02', 'data': hexPort + '0000' + hexTime})
- except Exception, e:
- return JsonResponse({'result': 0, 'description': u'检测失败,启动多个端口的时候失败,可能是因为您的网络不好,请重试。'})
- return JsonOkResponse()
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u"测试出现异常,请稍后重试"))
- @permission_required(ROLE.tester)
- def serialTest(request):
- """
- 串口测试 这个地方或做成类似 设备参数设置函数一样的
- 此函数只负责 测试串口 结果 测试是否通过由测试人员决定
- :param request:
- :return:
- """
- payload = json.loads(request.body)
- logicalCode = payload.get("logicalCode", "")
- if not logicalCode:
- return JsonErrorResponse(u"请先扫码获取设备逻辑码")
- device = Device.objects.filter(logicalCode = logicalCode).first()
- if not device:
- return JsonErrorResponse(u"设备尚未绑定入库")
- driverCode = device.driverCode
- dev = Device.get_dev_by_logicalCode(logicalCode)
- # TODO 德力西的先兼容 后续搞到serialTest上去
- if driverCode in ["100210", "100205"]:
- return dlxTest(request)
- box = ActionDeviceBuilder.create_action_device(dev, typeCode = driverCode)
- try:
- result = box.serial_test(payload)
- except NotImplementedError:
- return JsonErrorResponse(description = u"该设备尚不支持串口测试")
- except TestError as e:
- # 这个地方主要会有三种错误,一是串口不通 二是网络超时 三是设备明确表示串口失败
- return JsonErrorResponse(description = str(e))
- return JsonOkResponse(result.get("description", ""))
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'记录结果错误'))
- @permission_required(ROLE.tester)
- def recordTestResult(request):
- payload = json.loads(request.body)
- logicalCode = payload.get('logicalCode')
- result = payload.get('result')
- desc = payload.get('desc')
- username = request.user.username
- nickname = request.user.nickname
- nowTime = datetime.datetime.now()
- sTime = get_zero_time(nowTime)
- eTime = get_tomorrow_zero_time(nowTime)
- # 先把今天的这台测试记录删除掉
- TestRecord.objects.filter(testerName = username, logicalCode = logicalCode, dateTimeAdded__gte = sTime,
- dateTimeAdded__lte = eTime).delete()
- # 增加进来
- if result == 'success':
- desc = u'扫码测试通过'
- TestRecord(testerName = username, nickname = nickname, logicalCode = logicalCode, result = result, desc = desc,
- dateTimeAdded = nowTime).save()
- rcds = TestRecord.objects.filter(testerName = username, dateTimeAdded__gte = sTime, dateTimeAdded__lte = eTime)
- allCount, success, failed = 0, 0, 0
- for rcd in rcds:
- if rcd.result == 'failed':
- failed += 1
- elif rcd.result == 'success':
- success += 1
- else:
- pass
- allCount += 1
- okScale = int(success * 100.0 / allCount)
- today = nowTime.strftime(Const.DATE_FMT)
- TestStats.get_collection().update({'date': today, 'testerName': username},
- {'$set': {
- 'testerName': username,
- 'nickname': nickname,
- 'date': today,
- 'all': allCount,
- 'success': success,
- 'failed': failed,
- 'okScale': okScale
- }}, upsert = True)
- return JsonOkResponse()
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误'))
- @permission_required(ROLE.tester)
- def getTestStatsByDay(request):
- # payload = json.loads(request.body)
- startDay = request.GET.get('startDay')
- endDay = request.GET.get('endDay')
- pageIndex = int(request.GET.get('pageIndex'))
- pageSize = int(request.GET.get('pageSize'))
- rcds = TestStats.objects.filter(date__gte = startDay, date__lte = endDay, testerName = request.user.username)
- showRcds = rcds.paginate(pageIndex = pageIndex, pageSize = pageSize)
- dataList = [{'count': rcd.all, 'successCount': rcd.success, 'failCount': rcd.failed, 'date': rcd.date} for rcd in
- showRcds]
- return JsonOkResponse(payload = {'total': rcds.count(), 'dataList': dataList})
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误'))
- @permission_required(ROLE.tester)
- def queryTestRcds(request):
- pageIndex = int(request.GET.get('pageIndex'))
- pageSize = int(request.GET.get('pageSize'))
- searchKey = str(request.GET.get('searchKey', ''))
- date = request.GET.get('date', None)
- if date is not None:
- today = to_datetime(date + ' 00:00:00')
- sTime = get_zero_time(today)
- eTime = get_tomorrow_zero_time(today)
- rcds = TestRecord.objects(testerName = request.user.username, dateTimeAdded__gte = sTime,
- dateTimeAdded__lte = eTime).search(searchKey).order_by('-dateTimeAdded')
- else:
- rcds = TestRecord.objects(testerName = request.user.username).search(searchKey).order_by('-dateTimeAdded')
- dataList = []
- for rcd in rcds.paginate(pageIndex = pageIndex, pageSize = pageSize):
- dataList.append({
- 'logicalCode': rcd.logicalCode,
- 'result': rcd.result,
- 'desc': rcd.desc,
- 'dateTimeAdded': rcd.dateTimeAdded.strftime(Const.DATETIME_FMT)
- })
- return JsonOkResponse(payload = {'total': rcds.count(), 'dataList': dataList})
- @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误'))
- @permission_required(ROLE.tester)
- def queryDeviceTestDetail(request):
- logicalCode = request.GET.get('logicalCode')
- devs = Device.get_collection().find({'logicalCode': logicalCode})
- if devs.count < 0:
- return JsonErrorResponse(description = u'查询失败,没有找到设备')
- devDict = devs[0]
- dataList = []
- if devDict.has_key('dateTimeBinded') and devDict['dateTimeBinded']:
- dataList.insert(0, {'time': devDict['dateTimeBinded'].strftime(Const.DATETIME_FMT), 'desc': u'绑定二维码成功',
- 'name': u'维码器生产商'})
- if devDict.has_key('simActiveFirstTime') and devDict['simActiveFirstTime']:
- dataList.insert(0, {'time': devDict['simActiveFirstTime'].strftime(Const.DATETIME_FMT), 'desc': u'设备首次上线成功',
- 'name': u'模块生产商'})
- rcds = TestRecord.objects.filter(logicalCode = logicalCode).order_by('dateTimeAdded')
- for rcd in rcds:
- dataList.insert(0, {'time': rcd.dateTimeAdded.strftime(Const.DATETIME_FMT), 'desc': rcd.desc,
- 'name': rcd.nickname})
- return JsonOkResponse(payload = {'dataList': dataList})
- def getCookies(request):
- response = HttpResponse()
- if settings.DEBUG:
- from urllib import unquote
- session = ''
- for key in request.session.keys():
- session += '<h2>{} -- {}</h2>'.format(key, request.session.get(key))
- cookies = ''
- for key in request.COOKIES.keys():
- cookies += '<h2>{} -- {}</h2>'.format(key, unquote(request.COOKIES.get(key)))
- response.content = '<h1>SESSION</h1>{}<hr><h1>COOKIES</h1>{}'.format(session, cookies)
- return response
- def setCookies(request):
- key = request.GET.get('k', 'jump')
- value = request.GET.get('v', '')
- if not value:
- resp = HttpResponse(
- '<h1>设置失败 {}--{}</h1>'.format(key, value)
- )
- else:
- resp = HttpResponse(
- '<h1>设置成功 {}--{}</h1>'.format(key, value)
- )
- resp.set_cookie(key = key, value = value, max_age = 24 * 3600 * 30, domain = settings.COOKIE_DOMAIN)
- return resp
- def clearCookies(request):
- response = HttpResponse('清理成功')
- if settings.DEBUG:
- session = ''
- for key in request.session.keys():
- session += '<h2>{} -- {}</h2>'.format(key, request.session.get(key))
- from urllib import unquote
- cookies = ''
- for key in request.COOKIES.keys():
- cookies += '<h2>{} -- {}</h2>'.format(key, unquote(request.COOKIES.get(key)))
- response.content = '<h1>清理成功</h1><h1>SESSION</h1>{}<hr><h1>COOKIES</h1>{}'.format(session, cookies)
- # session清除
- request.session.clear()
- # cookie清除
- for key in request.COOKIES.keys():
- response.delete_cookie(key, domain = settings.COOKIE_DOMAIN)
- return response
- def bindParentNode(request):
- logicalCode = request.GET.get('childLogicalCode')
- gateImei = request.GET.get('parentLogicalCode')
- devObj = Device.objects(logicalCode = logicalCode).first()
- if devObj is None:
- return JsonErrorResponse(description = u"没有找到该设备")
- if not gateImei:
- return JsonErrorResponse(description = u"网关的IMEI没有传入")
- Device.get_collection().update({'logicalCode': logicalCode}, {'$set': {'gateImei': gateImei}})
- return JsonOkResponse()
- def unbindParentNode(request):
- logicalCode = request.POST['logicalCode']
- devObj = Device.objects(logicalCode = logicalCode).first()
- if devObj:
- Device.get_collection().update({'logicalCode': logicalCode}, {'$set': {'gateImei': ''}})
- return JsonOkResponse()
- else:
- return JsonErrorResponse(description = u"没有找到该设备")
|