# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging import time import simplejson as json from django.conf import settings from django.http import HttpResponse from pymongo.errors import DuplicateKeyError from typing import TYPE_CHECKING from apilib.utils_datetime import get_zero_time, get_tomorrow_zero_time, \ to_datetime from apilib.utils_json import JsonResponse from apps.web.common.models import OperatorLog from apps.web.constant import Const, DeviceCmdCode from apps.web.core import ROLE from apps.web.core.helpers import ActionDeviceBuilder from apps.web.core.adapter.base import fill_2_hexByte from apps.web.core.exceptions import ServiceException, TestError from apps.web.exceptions import UserServerException from apps.web.core.networking import MessageSender from apps.web.core.utils import JsonErrorResponse, JsonOkResponse from apps.web.device.define import DeviceChannelType from apps.web.device.models import Device, CheckDevice from apps.web.device.models import DeviceDict from apps.web.helpers import get_wx_config from apps.web.services.bluetooth.service import ActionBtDeviceBuilder from apps.web.test.models import TestRecord, TestStats from apps.web.utils import trace_call, error_tolerate, tester_login, permission_required, \ LimitAttemptsManager if TYPE_CHECKING: from django.core.handlers.wsgi import WSGIRequest from apps.web.test.models import Tester logger = logging.getLogger(__name__) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'入库失败,请重试'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def bindCode(request): curr_user = request.user if not curr_user.has_privilage('bind'): return JsonResponse({'result': 0, 'description': u'您无权限绑定设备', 'payload': {}}) logicalCode = str(request.GET.get('logicalCode')) imei = str(request.GET.get('imei')) logger.debug('logicalCode = %s; devNo = %s' % (logicalCode, imei)) if not logicalCode: return JsonResponse({'result': 0, 'description': u'设备编号为空', 'payload': {}}) if not imei: return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}}) logicalCode = logicalCode.strip() if 'DATA_MATRIX' in imei: imei = imei.split(',')[1].split(';')[0].strip() elif 'MAC:' in imei: imei = imei.split(',')[0].split(':')[1].strip() elif ';' in imei: imei = imei.split(';')[0].strip() if len(imei) > 15: return JsonResponse({'result': 0, 'description': u'错误的IMEI: {}'.format(imei), 'payload': {}}) dev = Device.get_collection().find_one( {'$or': [{'logicalCode': logicalCode}, {'devNo': imei, 'logicalCode': {'$nin': [None, '']}}]}) if dev: return JsonResponse({'result': 0, 'description': u'已经绑定(L=%s, IMEI=%s)' % (logicalCode, imei), 'payload': {}}) try: Device.bind(imei, logicalCode, curr_user) except DuplicateKeyError: return JsonResponse({ 'result': 0, 'description': u'已经绑定(L=%s, IMEI=%s)' % (logicalCode, imei), 'payload': {}}) except UserServerException as e: return JsonErrorResponse(description = e.message) return JsonResponse({'result': 1, 'description': u'绑定成功', 'payload': {}}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'入库失败,请重试'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def unbindCode(request): curr_user = request.user # type: Tester if not curr_user.has_privilage('unbind'): return JsonResponse({'result': 0, 'description': u'您没有权限解解绑定设备', 'payload': {}}) imei = str(request.GET.get('imei')) logger.debug('devNo = %s' % imei) if not imei: return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}}) if 'DATA_MATRIX' in imei: imei = imei.split(',')[1].split(';')[0].strip() elif 'MAC:' in imei: imei = imei.split(',')[0].split(':')[1].strip() try: dev = Device.get_dev(imei) # type: DeviceDict if not dev: return JsonResponse({'result': 0, 'description': u'该设备不存在', 'payload': {}}) if curr_user.has_privilage('forceUnbind'): Device.unbind(dev = dev) else: Device.unbind(dev = dev, operator = curr_user) OperatorLog.log(user = request.user, level = OperatorLog.LogLevel.CRITICAL, operator_name = u'强制解除绑定', content = { 'logicalCode': dev.logicalCode, 'devNo': dev.devNo, 'iccid': dev.iccid, 'imsi': dev.imsi, 'simExpireDate': dev.get('simExpireDate', None), 'expireDate': dev.get('expireDate', None), 'location': dev.get('location'), 'softVer': dev.softVer, 'server': dev.server, 'coreVer': dev.coreVer }) return JsonResponse({'result': 1, 'description': u'解绑成功', 'payload': {}}) except UserServerException as e: return JsonResponse({'result': 0, 'description': e.message, 'payload': {}}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'解注册失败,请重试'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def unregisterCode(request): curr_user = request.user # type: Tester if not curr_user.has_privilage('unregister'): return JsonResponse({'result': 0, 'description': u'您没有权限解注册设备', 'payload': {}}) imei = str(request.GET.get('imei')) logger.debug('devNo = %s' % imei) if not imei: return JsonResponse({'result': 0, 'description': u'设备IMEI为空', 'payload': {}}) try: device = Device.get_dev(imei) # type: DeviceDict if not device: return JsonResponse({'result': 0, 'description': u'该设备不存在', 'payload': {}}) if not device.is_registered: return JsonResponse({'result': 0, 'description': u'该设备没有注册,不需要解绑', 'payload': {}}) Device.un_register(device, force = True, operator = request.user.human_id) return JsonResponse({'result': 1, 'description': u'解注册成功', 'payload': {}}) except Exception as e: return JsonResponse({'result': 0, 'description': e.message, 'payload': {}}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def accountInfo(request): # type: (WSGIRequest)->JsonResponse return JsonResponse({"result": 1, "description": None, 'payload': request.user.to_dict()}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) def login(request): # type: (WSGIRequest)->JsonResponse try: payload = json.loads(request.body) username, password = payload['username'], payload['password'] return tester_login(request, logger, username, password) except Exception as e: logger.exception('super manager login error = %s' % e) return JsonResponse({'result': 0, 'description': u'登录异常', 'payload': {}}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def confirmTestResult(request): pass @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def getDeviceInfo(request): devNo = Device.get_dev_no_from_request(request) if devNo: dev = Device.get_dev(devNo) # type: DeviceDict if not dev: return JsonResponse({'result': 0, 'description': u'设备不存在,请首先绑定'}) if not dev.get('devNo'): return JsonResponse({'result': 0, 'description': u'设备无对应IMEI绑定,请首先绑定'}) device = Device.objects(devNo = devNo).first() if device is None: return JsonResponse({'result': 0, 'description': u'设备不存在数据库中,请首先绑定'}) payload = { 'devNo': dev.devNo, 'logicalCode': dev.logicalCode, 'softVer': dev.get('softVer', ''), 'hwVer': dev.get('hwVer', ''), 'mf': dev.get('mf', ''), 'registered': dev.is_registered } if dev.channelType == DeviceChannelType.Channel_BT: return JsonResponse( { '_comment_': 1, 'result': 1, 'description': None, 'payload': payload }) elif device.driverCode == Const.DEVICE_TYPE_CODE_CAR_NENGPAI: return JsonResponse( { '_comment_': 1, 'result': 1, 'description': None, 'payload': payload }) else: device_info = MessageSender.send(device = dev, cmd = DeviceCmdCode.GET_DEVINFO, payload = { 'IMEI': dev['devNo'] }, timeout = 15, retry = 3) if device_info['rst'] == 0: payload.update({ 'driverCode': device_info.get('driverCode', u'不支持该属性'), 'driverVersion': device_info.get('driverVersion', u'不支持该属性'), 'coreVer': device_info.get('core_ver', u'不支持该属性'), 'boardValid': device_info.get('board_valid', u'不支持该属性'), 'pw_cut_memory': device_info.get('pw_cut_memory', u'不支持该属性'), 'bindTime': device.dateTimeBinded.strftime( '%Y-%m-%d %H:%M:%S') if device.dateTimeBinded is not None else u'未知', 'testTime': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') }) return JsonResponse( { '_comment_': 1, 'result': 1, 'description': None, 'payload': payload }) else: return JsonResponse({'result': 0, 'description': u'获取模块信息超时,请检查模块是否上线,必要时重启模块'}) return JsonResponse({'result': 0, 'description': u'设备不存在'}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def getTestResultList(request): results = CheckDevice.objects() return JsonResponse( { 'result': 1, 'description': None, 'payload': {'total': 0, 'items': []} }) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def onPoints(request): """ 远程上分, 代理商通过上分测试机器系统流程正确。 :param request: :return: """ logicalCode = str(request.POST.get('value', None)) devNo = Device.get_devNo_by_logicalCode(logicalCode) logger.info('l=%s,devNo=%s' % (logicalCode, devNo)) if not devNo: return JsonResponse({'result': 0, 'description': u'设备不存在'}) score = int(request.POST.get('coins', '1')) intfType = request.POST.get('type', 'pulse') # 通知设备,付款 dev = Device.get_dev(devNo) # type:DeviceDict if dev.is_registered: return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'}) # 到设备上获取驱动编码信息 if 'driverCode' in dev and dev['driverCode'] == Const.DEVICE_TYPE_CODE_CAR_NENGPAI: try: device_adapter = ActionDeviceBuilder.create_action_device(dev, typeCode = Const.DEVICE_TYPE_CODE_CAR_NENGPAI) result = device_adapter.test(coins = score) if result['rst'] == -1: return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'}) elif result['rst'] == 1: return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'}) elif result['rst'] != 0: if not result.has_key('desc'): return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'.format(result['rst'])}) else: return JsonResponse({'result': 0, 'description': u'错误:({}),请联系平台客服'.format(result['desc'])}) except ServiceException, e: return JsonResponse({'result': 0, 'description': e.result.get('description'), 'payload': {}}) return JsonResponse({'result': 1, 'description': u'success'}) devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, {'IMEI': dev['devNo']}) if 'rst' in devInfo and devInfo['rst'] != 0: if devInfo['rst'] == -1: return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'}) elif devInfo['rst'] == 1: return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'}) else: return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'}) if 'driverCode' in devInfo and intfType == 'uart': type_code = devInfo['driverCode'] elif intfType == 'pulse': type_code = Const.DEVICE_TYPE_CODE_PULSE elif 'soft_ver' in devInfo: version = devInfo['soft_ver'].split('.')[0].upper() if version >= 'V7': type_code = Const.DEVICE_TYPE_CODE_CHANGING_WEIFULE if 'V7.1' in version: type_code = Const.DEVICE_TYPE_CODE_CHANGING_WEIFULE2 else: return JsonResponse({'result': 0, 'description': u'不支持该设备类型的设备测试'}) try: device_adapter = ActionDeviceBuilder.create_action_device(dev, typeCode = type_code) result = device_adapter.test(coins = score) if result['rst'] == -1: return JsonResponse({'result': 0, 'description': u'下发命令超时, 请检查设备是否快闪上线'}) elif result['rst'] == 1: return JsonResponse({'result': 0, 'description': u'串口命令超时,请检查设备和模块之间的串口连线'}) elif result['rst'] != 0: if not result.has_key('desc'): return JsonResponse({'result': 0, 'description': u'未知错误({}),请联系平台客服'.format(result['rst'])}) else: return JsonResponse({'result': 0, 'description': u'错误:({}),请联系平台客服'.format(result['desc'])}) except ServiceException, e: return JsonResponse({'result': 0, 'description': e.result.get('description'), 'payload': {}}) return JsonResponse({'result': 1, 'description': u'success'}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @trace_call(logger = logger) @permission_required(ROLE.tester) def onBtPoints(request): payload = json.loads(request.body) logicalCode = payload.get('logicalCode') major = int(payload.get('major', 0x1)) minor = int(payload.get('minor', 0x1)) dev = Device.get_dev_by_logicalCode(logicalCode) # type: DeviceDict if not dev: return JsonResponse({'result': 0, 'description': u'设备不存在'}) if dev.is_registered: return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'}) dev['major'] = major dev['minor'] = minor actionBox = ActionBtDeviceBuilder.create(Const.BT_DEVICE_TYPE_CODE_MAP[int(payload.get('code', 1))], dev) return JsonResponse({'result': 1, 'description': '', 'payload': { 'command': actionBox.encode_cmd( cmdId = 1, **{ 'attachParas': { 'package': { 'coins': int(payload.get('coins', 1)) } }, 'seqNo': int(time.time())})}}) @trace_call(logger = logger) @permission_required(ROLE.tester) def labelDevice(request): """ 标记坏的设备,便于我们记录错误信息 :param request: :return: """ payload = json.loads(request.body) logicalCode = payload.get('logicalCode', None) devNo = Device.get_devNo_by_logicalCode(logicalCode) label = payload.get('label', '') if not devNo: return JsonResponse({'result': 0, 'description': u'设备不存在'}) try: CheckDevice.get_collection().update_one({'imei': devNo}, {'$set': {'label': label}}, upsert = False) except Exception, e: logger.exception('[label_device]update checkdevice error = %s' % e) return JsonResponse({'result': 0, 'description': 'update checkdevice error=%s' % e}) return JsonResponse({'result': 1, 'description': u'success'}) @trace_call(logger = logger) @permission_required(ROLE.tester) def getDeviceLabel(request): """ 标记坏的设备,便于我们记录错误信息 :param request: :return: """ logicalCode = request.POST.get('value', None) devNo = Device.get_devNo_by_logicalCode(logicalCode) if not devNo: return JsonResponse({'result': 0, 'description': u'设备不存在'}) try: results = CheckDevice.get_collection().find({'imei': devNo}) except Exception, e: logger.info('update check device error = %s' % e) return JsonResponse({'result': 0, 'description': 'update check device error=%s' % e}) if len(results) == 0: return JsonResponse({'result': 0, 'description': u'没有找到设备的标记信息'}) result = {} result['devNo'] = results[0]['devNo'] result['logicalCode'] = results[0]['logicalCode'] result['testTime'] = results[0]['testTime'] result['testResult'] = results[0]['testResult'] result['testResultDesc'] = results[0]['testResultDesc'] result['bindTime'] = results[0]['bindTime'] result['devNo'] = results[0]['devNo'] result['label'] = results[0]['label'] return JsonResponse({'result': 1, 'description': u'success', 'payload': result}) @trace_call(logger = logger) def wxconfig(request): url = request.GET.get('href') if not url: return JsonResponse({'result': 0, 'description': u'参数错误', 'payload': {}}) value = get_wx_config(None, url) return JsonOkResponse(payload = {'wxconfig': value}) @permission_required(ROLE.tester) def setComDriver(request): driverName = request.POST.get("driverName") logicalCode = request.POST.get("logicalCode") dev = Device.get_dev_by_logicalCode(logicalCode) result = MessageSender.send(dev, DeviceCmdCode.SET_DEVINFO, { 'IMEI': dev['devNo'], 'driver_set': { 'driver_url': 'http://www.washpayer.com/uploaded/' + driverName + '.driver' } }) if result['rst'] != 0: return JsonResponse({'result': 0, 'description': u'安装驱动失败', 'payload': {}}) return JsonResponse({'result': 1, 'description': None, 'payload': {}}) @permission_required(ROLE.tester) def getComDriver(request): logicalCode = request.GET.get("logicalCode") dev = Device.get_dev_by_logicalCode(logicalCode) devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, { 'IMEI': dev['devNo'] }) if devInfo['rst'] != 0: return JsonResponse({'result': 0, 'description': u'查询驱动失败', 'payload': {}}) return JsonResponse({'result': 1, 'description': None, 'payload': { 'driverCode': devInfo['driverCode'], 'driverVersion': 'driverVersion'}}) @error_tolerate(logger = logger, nil = JsonResponse({'result': 0, 'description': u'系统错误'})) @permission_required(ROLE.tester) def checkLogicalBindingByRange(request): # type: (WSGIRequest)->JsonResponse """ 检测逻辑码在某一个范围是否都绑定了 :param request: :return: """ payload = json.loads(request.body) # todo 这个不用了,后续写在Validation类里面 # try: # logicalCodeRange(payload) # except MultipleInvalid: # return JsonErrorResponse(description=u'编号只能输入数字') start = payload.get('start') end = payload.get('end') strStart = str(start) strEnd = str(end) if 'G' in strStart and 'G' in strEnd: title = 'G' start = strStart[1:] end = strEnd[1:] elif 'R' in strStart and 'R' in strEnd: title = 'R' start = strStart[1:] end = strEnd[1:] elif 'N' in strStart and 'N' in strEnd: title = 'N' start = strStart[1:] end = strEnd[1:] elif 'B' in strStart and 'B' in strEnd: title = 'B' start = strStart[1:] end = strEnd[1:] else: title = '' spec = [title + str(_) for _ in xrange(int(start), int(end) + 1)] match = [_['logicalCode'] for _ in Device.get_collection().find({'logicalCode': {'$in': spec}})] diff = set(spec) - set(match) unbind_count = len(diff) description = u'测试结果: 一共检查了%d条数据。\n' % (len(spec),) description += u'全部已经绑定' if not bool(unbind_count) else u'有未绑定逻辑码(%s)条,它们为 %s' % (unbind_count, ', '.join(diff)) return JsonResponse({'result': 1, 'description': description, 'payload': {}}) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'更新错误')) @permission_required(ROLE.tester) def resetPassword(request): # type: (WSGIRequest)->JsonResponse tester = request.user oldPassword = request.POST.get('oldPassword', "") if not oldPassword: return JsonResponse({"result": 0, "description": u"请输入旧密码", 'payload': {}}) limitManager = LimitAttemptsManager('resetPassword', oldPassword) if limitManager.is_exceeded_limit(): return JsonResponse({'result': 0, 'description': u'超出输入错误次数限制,请明日再试', 'payload': {}}) if tester.check_password(oldPassword): password = request.POST.get('password', "") if password == "": return JsonResponse({"result": 0, "description": u"请输入密码", 'payload': {}}) try: tester.set_password(password) limitManager.clear() return JsonResponse({"result": 1, "description": None, 'payload': {}}) except Exception, e: logger.exception('update Dealer password some error=%s' % e) limitManager.incr() return JsonResponse({ 'result': 0, 'description': u'旧密码输入错误,您还可输入%s次' % limitManager.times_left(), 'payload': {} }) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取特性错误')) @permission_required(ROLE.tester) def getFeatureList(request): features = request.user.features resultDict = {} for f in features: resultDict[f] = True return JsonOkResponse(payload = resultDict) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'测试错误')) @permission_required(ROLE.tester) def dlxTest(request): payload = json.loads(request.body) ports = payload['ports'] logicalCode = payload['logicalCode'] # 首先检查驱动是否正确 dev = Device.get_dev_by_l(logicalCode) if dev.get('ownerId'): return JsonResponse({'result': 0, 'description': u'该设备已注册,不支持测试,如需测试,请首先解除绑定!'}) try: devInfo = MessageSender.send(dev, DeviceCmdCode.GET_DEVINFO, {'IMEI': dev['devNo']}) except Exception, e: return JsonResponse({'result': 1, 'description': u'检测失败,网络通讯失败。'}) if devInfo.has_key('rst') and devInfo['rst'] != 0: if devInfo['rst'] == -1: return JsonResponse({'result': 0, 'description': u'检测失败,设备网络通讯超时。'}) elif devInfo['rst'] == 1: return JsonResponse({'result': 0, 'description': u'检测失败,模块之间的串口通讯超时, 设备的串口接线可能有问题。'}) if devInfo['driverCode'] != Const.DEVICE_TYPE_CODE_CHARGING_DIANCHUAN: return JsonResponse({'result': 0, 'description': u'检测失败,设备驱动错误。'}) for port in ports: try: hexPort = fill_2_hexByte(hex(int(port)), 2) hexTime = fill_2_hexByte(hex(1)) devInfo = MessageSender.send(dev, DeviceCmdCode.OPERATE_DEV_SYNC, {'IMEI': dev['devNo'], "funCode": '02', 'data': hexPort + '0000' + hexTime}) except Exception, e: return JsonResponse({'result': 0, 'description': u'检测失败,启动多个端口的时候失败,可能是因为您的网络不好,请重试。'}) return JsonOkResponse() @error_tolerate(logger = logger, nil = JsonErrorResponse(u"测试出现异常,请稍后重试")) @permission_required(ROLE.tester) def serialTest(request): """ 串口测试 这个地方或做成类似 设备参数设置函数一样的 此函数只负责 测试串口 结果 测试是否通过由测试人员决定 :param request: :return: """ payload = json.loads(request.body) logicalCode = payload.get("logicalCode", "") if not logicalCode: return JsonErrorResponse(u"请先扫码获取设备逻辑码") device = Device.objects.filter(logicalCode = logicalCode).first() if not device: return JsonErrorResponse(u"设备尚未绑定入库") driverCode = device.driverCode dev = Device.get_dev_by_logicalCode(logicalCode) # TODO 德力西的先兼容 后续搞到serialTest上去 if driverCode in ["100210", "100205"]: return dlxTest(request) box = ActionDeviceBuilder.create_action_device(dev, typeCode = driverCode) try: result = box.serial_test(payload) except NotImplementedError: return JsonErrorResponse(description = u"该设备尚不支持串口测试") except TestError as e: # 这个地方主要会有三种错误,一是串口不通 二是网络超时 三是设备明确表示串口失败 return JsonErrorResponse(description = str(e)) return JsonOkResponse(result.get("description", "")) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'记录结果错误')) @permission_required(ROLE.tester) def recordTestResult(request): payload = json.loads(request.body) logicalCode = payload.get('logicalCode') result = payload.get('result') desc = payload.get('desc') username = request.user.username nickname = request.user.nickname nowTime = datetime.datetime.now() sTime = get_zero_time(nowTime) eTime = get_tomorrow_zero_time(nowTime) # 先把今天的这台测试记录删除掉 TestRecord.objects.filter(testerName = username, logicalCode = logicalCode, dateTimeAdded__gte = sTime, dateTimeAdded__lte = eTime).delete() # 增加进来 if result == 'success': desc = u'扫码测试通过' TestRecord(testerName = username, nickname = nickname, logicalCode = logicalCode, result = result, desc = desc, dateTimeAdded = nowTime).save() rcds = TestRecord.objects.filter(testerName = username, dateTimeAdded__gte = sTime, dateTimeAdded__lte = eTime) allCount, success, failed = 0, 0, 0 for rcd in rcds: if rcd.result == 'failed': failed += 1 elif rcd.result == 'success': success += 1 else: pass allCount += 1 okScale = int(success * 100.0 / allCount) today = nowTime.strftime(Const.DATE_FMT) TestStats.get_collection().update({'date': today, 'testerName': username}, {'$set': { 'testerName': username, 'nickname': nickname, 'date': today, 'all': allCount, 'success': success, 'failed': failed, 'okScale': okScale }}, upsert = True) return JsonOkResponse() @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误')) @permission_required(ROLE.tester) def getTestStatsByDay(request): # payload = json.loads(request.body) startDay = request.GET.get('startDay') endDay = request.GET.get('endDay') pageIndex = int(request.GET.get('pageIndex')) pageSize = int(request.GET.get('pageSize')) rcds = TestStats.objects.filter(date__gte = startDay, date__lte = endDay, testerName = request.user.username) showRcds = rcds.paginate(pageIndex = pageIndex, pageSize = pageSize) dataList = [{'count': rcd.all, 'successCount': rcd.success, 'failCount': rcd.failed, 'date': rcd.date} for rcd in showRcds] return JsonOkResponse(payload = {'total': rcds.count(), 'dataList': dataList}) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误')) @permission_required(ROLE.tester) def queryTestRcds(request): pageIndex = int(request.GET.get('pageIndex')) pageSize = int(request.GET.get('pageSize')) searchKey = str(request.GET.get('searchKey', '')) date = request.GET.get('date', None) if date is not None: today = to_datetime(date + ' 00:00:00') sTime = get_zero_time(today) eTime = get_tomorrow_zero_time(today) rcds = TestRecord.objects(testerName = request.user.username, dateTimeAdded__gte = sTime, dateTimeAdded__lte = eTime).search(searchKey).order_by('-dateTimeAdded') else: rcds = TestRecord.objects(testerName = request.user.username).search(searchKey).order_by('-dateTimeAdded') dataList = [] for rcd in rcds.paginate(pageIndex = pageIndex, pageSize = pageSize): dataList.append({ 'logicalCode': rcd.logicalCode, 'result': rcd.result, 'desc': rcd.desc, 'dateTimeAdded': rcd.dateTimeAdded.strftime(Const.DATETIME_FMT) }) return JsonOkResponse(payload = {'total': rcds.count(), 'dataList': dataList}) @error_tolerate(logger = logger, nil = JsonErrorResponse(u'获取统计错误')) @permission_required(ROLE.tester) def queryDeviceTestDetail(request): logicalCode = request.GET.get('logicalCode') devs = Device.get_collection().find({'logicalCode': logicalCode}) if devs.count < 0: return JsonErrorResponse(description = u'查询失败,没有找到设备') devDict = devs[0] dataList = [] if devDict.has_key('dateTimeBinded') and devDict['dateTimeBinded']: dataList.insert(0, {'time': devDict['dateTimeBinded'].strftime(Const.DATETIME_FMT), 'desc': u'绑定二维码成功', 'name': u'维码器生产商'}) if devDict.has_key('simActiveFirstTime') and devDict['simActiveFirstTime']: dataList.insert(0, {'time': devDict['simActiveFirstTime'].strftime(Const.DATETIME_FMT), 'desc': u'设备首次上线成功', 'name': u'模块生产商'}) rcds = TestRecord.objects.filter(logicalCode = logicalCode).order_by('dateTimeAdded') for rcd in rcds: dataList.insert(0, {'time': rcd.dateTimeAdded.strftime(Const.DATETIME_FMT), 'desc': rcd.desc, 'name': rcd.nickname}) return JsonOkResponse(payload = {'dataList': dataList}) def getCookies(request): response = HttpResponse() if settings.DEBUG: from urllib import unquote session = '' for key in request.session.keys(): session += '