# -*- coding: utf-8 -*- #!/usr/bin/env python """ 预设逻辑码对应关系 """ import os import xlrd import pprint from base import regex_split, init_env import click init_env(interactive=True) from apps.web.device.models import Device class DuplicateError(Exception): pass class InvalidInput(Exception): pass def check_duplicate(l): """ 检查是否有重复项 :param l: :return: """ seen = [] seen_add = seen.append for e in l: if e in seen: raise DuplicateError() seen_add(e) def check(*checkers): for checker in checkers: checker() @click.group() def cli(): click.echo(u'欢迎来到添加逻辑码脚本!') @cli.command() @click.option('-f','--filepath', prompt=u'请输入要转换的表格文件路径', default=os.environ['DEFAULT_LOGICALCODE_EXCEL_PATH']) @click.option('-s','--start_row', prompt=u'请输入开始插入的行数') @click.option('-m','--mode', prompt=u'模式:=(read|write|delete)', default='read') def excel(filepath, start_row, mode): """ 通过表格批量导入映射关系 :param filepath: :param start_row: :param mode: :return: """ start_row = int(start_row) book = xlrd.open_workbook(filepath) try: sheet = book.sheet_by_name("Sheet1") except: click.echo("no sheet in %s named Sheet1" % filepath) raise nrows = sheet.nrows rows = [] for i in range(1, nrows): row_data = sheet.row_values(i) #: 有部分空行和描述行存在,需要过滤掉 if isinstance(row_data[0], float) and int(row_data[0]) >= start_row and row_data[1]: delimiters = "<", ";", "," row_data[1] = regex_split(str(int(row_data[1])), *delimiters)[0] rows.append(row_data) #: 以免有不合规数据插入,需要首先做校验 check_duplicate(rows) if mode == 'read': click.echo('read mode start') pprint.pprint(list(enumerate(rows))) click.echo('read mode end') elif mode == 'write': #: 目前只操作 click.echo('write mode start') click.echo('start pushing mapping to database and cache.') for row in rows: logicalCode = str(int(row[0])) devno = str(int(row[1])) click.echo('adding (logicalCode=%s, devNo=%s) to db' % (logicalCode, devno)) Device.get_collection().insert_one({'devNo':devno, 'logicalCode':logicalCode}) click.echo('setting (logicalCode=%s, devNo=%s) to cache' % (logicalCode, devno)) Device.update_l_cache(devno, logicalCode) Device.get_dev(devno) click.echo('write mode end') elif mode == 'delete': click.echo('delete mode start') click.echo('we will only delete cache now') for row in rows: logicalCode = str(int(row[0])) devNo = str(int(row[1])) click.echo('deleting (logicalCode=%s, devNo=%s)' % (logicalCode, devNo)) Device.invalid_l_cache(logicalCode) Device.invalid_device_cache(devNo) click.echo('delete mode end') elif mode == 'modify': click.echo('modify mode start') click.echo('import!!!only expert use it') for row in rows: logicalCode = str(int(row[0])) devNo = str(int(row[1])) Device.invalid_l_cache(logicalCode) Device.invalid_device_cache(devNo) Device.get_collection().update_one({'devNo': devNo}, {'$set': {'logicalCode': logicalCode}}, upsert = False) Device.update_l_cache(devNo, logicalCode) Device.get_dev(devNo) click.echo('delete mode end') elif mode == 'fix': click.echo('fix mode start') for row in rows: logicalCode = str(int(row[0])) devNo = str(int(row[1])) click.echo('fixing (logicalCode=%s, devNo=%s)' % (logicalCode, devNo)) Device.invalid_device_cache(devNo) click.echo('fix mode end') click.echo('Finished! total: %s rows' % len(rows)) @cli.command() @click.option('--bulk/--no-bulk', help=u'是否给所有设备都添加设备号为逻辑编码', default=False) @click.option('--relation', help=u'映射关系', default='') @click.option('--relations', help=u'多个映射关系', default='') def inject(bulk, relation, relations): """ 手工添加映射关系 :param bulk: :param relation: 示例: devNo,logicalCode :param relations: 示例: devNo,logicalCode,devNo,logicalCode :return: """ def _single_operation(devNo, logicalCode): insert_relation_mapping(logicalCode=logicalCode, devNo=devNo) return Device.objects(devNo=devNo, logicalCode=logicalCode).save() if bulk: #: 目前不可用 pass else: if not any([relation, relations]): raise InvalidInput(u"设备号不可为空!") if relations: relations = relation.split(',') relation_tuples = [ (devNo, logicalCode) for devNo in relations[::2] for logicalCode in relation[1::2] ] for r in relation_tuples: _single_operation(*r) if relation: try: devNo, logicalCode = relation.strip(' ').split(',') click.echo(u'你正在添加映射关系 devNo=%s,logicalCode=%s' % (devNo, logicalCode)) _single_operation(devNo, logicalCode) except AttributeError: click.echo(u'请注意输入格式应为 设备号,逻辑码 ') raise click.echo('success!') @cli.command() def update(): """ 需要做一定的校验 :return: """ pass @cli.command() def remove(logicalCode): """ :param logicalCode: :return: """ pass if __name__ == '__main__': click.echo(u'该脚本已经弃用,请检查是否有变更的部分需要处理') #cli()