123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- """
- 预设逻辑码对应关系
- """
- import os
- import xlrd
- import pprint
- from base import regex_split, init_env
- import click
- init_env(interactive=True)
- from apps.web.device.models import Device
- class DuplicateError(Exception):
- pass
- class InvalidInput(Exception):
- pass
- def check_duplicate(l):
- """
- 检查是否有重复项
- :param l:
- :return:
- """
- seen = []
- seen_add = seen.append
- for e in l:
- if e in seen:
- raise DuplicateError()
- seen_add(e)
- def check(*checkers):
- for checker in checkers:
- checker()
- @click.group()
- def cli():
- click.echo(u'欢迎来到添加逻辑码脚本!')
- @cli.command()
- @click.option('-f','--filepath', prompt=u'请输入要转换的表格文件路径', default=os.environ['DEFAULT_LOGICALCODE_EXCEL_PATH'])
- @click.option('-s','--start_row', prompt=u'请输入开始插入的行数')
- @click.option('-m','--mode', prompt=u'模式:=(read|write|delete)', default='read')
- def excel(filepath, start_row, mode):
- """
- 通过表格批量导入映射关系
- :param filepath:
- :param start_row:
- :param mode:
- :return:
- """
- start_row = int(start_row)
- book = xlrd.open_workbook(filepath)
- try:
- sheet = book.sheet_by_name("Sheet1")
- except:
- click.echo("no sheet in %s named Sheet1" % filepath)
- raise
- nrows = sheet.nrows
- rows = []
- for i in range(1, nrows):
- row_data = sheet.row_values(i)
- #: 有部分空行和描述行存在,需要过滤掉
- if isinstance(row_data[0], float) and int(row_data[0]) >= start_row and row_data[1]:
- delimiters = "<", ";", ","
- row_data[1] = regex_split(str(int(row_data[1])), *delimiters)[0]
- rows.append(row_data)
- #: 以免有不合规数据插入,需要首先做校验
- check_duplicate(rows)
- if mode == 'read':
- click.echo('read mode start')
- pprint.pprint(list(enumerate(rows)))
- click.echo('read mode end')
- elif mode == 'write':
- #: 目前只操作
- click.echo('write mode start')
- click.echo('start pushing mapping to database and cache.')
- for row in rows:
- logicalCode = str(int(row[0]))
- devno = str(int(row[1]))
- click.echo('adding (logicalCode=%s, devNo=%s) to db' % (logicalCode, devno))
- Device.get_collection().insert_one({'devNo':devno, 'logicalCode':logicalCode})
- click.echo('setting (logicalCode=%s, devNo=%s) to cache' % (logicalCode, devno))
- Device.update_l_cache(devno, logicalCode)
- Device.get_dev(devno)
- click.echo('write mode end')
- elif mode == 'delete':
- click.echo('delete mode start')
- click.echo('we will only delete cache now')
- for row in rows:
- logicalCode = str(int(row[0]))
- devNo = str(int(row[1]))
- click.echo('deleting (logicalCode=%s, devNo=%s)' % (logicalCode, devNo))
- Device.invalid_l_cache(logicalCode)
- Device.invalid_device_cache(devNo)
- click.echo('delete mode end')
- elif mode == 'modify':
- click.echo('modify mode start')
- click.echo('import!!!only expert use it')
- for row in rows:
- logicalCode = str(int(row[0]))
- devNo = str(int(row[1]))
- Device.invalid_l_cache(logicalCode)
- Device.invalid_device_cache(devNo)
- Device.get_collection().update_one({'devNo': devNo}, {'$set': {'logicalCode': logicalCode}}, upsert = False)
- Device.update_l_cache(devNo, logicalCode)
- Device.get_dev(devNo)
- click.echo('delete mode end')
- elif mode == 'fix':
- click.echo('fix mode start')
- for row in rows:
- logicalCode = str(int(row[0]))
- devNo = str(int(row[1]))
- click.echo('fixing (logicalCode=%s, devNo=%s)' % (logicalCode, devNo))
- Device.invalid_device_cache(devNo)
- click.echo('fix mode end')
- click.echo('Finished! total: %s rows' % len(rows))
- @cli.command()
- @click.option('--bulk/--no-bulk', help=u'是否给所有设备都添加设备号为逻辑编码', default=False)
- @click.option('--relation', help=u'映射关系', default='')
- @click.option('--relations', help=u'多个映射关系', default='')
- def inject(bulk, relation, relations):
- """
- 手工添加映射关系
- :param bulk:
- :param relation: 示例: devNo,logicalCode
- :param relations: 示例: devNo,logicalCode,devNo,logicalCode
- :return:
- """
- def _single_operation(devNo, logicalCode):
- insert_relation_mapping(logicalCode=logicalCode, devNo=devNo)
- return Device.objects(devNo=devNo, logicalCode=logicalCode).save()
- if bulk:
- #: 目前不可用
- pass
- else:
- if not any([relation, relations]):
- raise InvalidInput(u"设备号不可为空!")
- if relations:
- relations = relation.split(',')
- relation_tuples = [ (devNo, logicalCode) for devNo in relations[::2] for logicalCode in relation[1::2] ]
- for r in relation_tuples:
- _single_operation(*r)
- if relation:
- try:
- devNo, logicalCode = relation.strip(' ').split(',')
- click.echo(u'你正在添加映射关系 devNo=%s,logicalCode=%s' % (devNo, logicalCode))
- _single_operation(devNo, logicalCode)
- except AttributeError:
- click.echo(u'请注意输入格式应为 设备号,逻辑码 ')
- raise
- click.echo('success!')
- @cli.command()
- def update():
- """
- 需要做一定的校验
- :return:
- """
- pass
- @cli.command()
- def remove(logicalCode):
- """
- :param logicalCode:
- :return:
- """
- pass
- if __name__ == '__main__':
- click.echo(u'该脚本已经弃用,请检查是否有变更的部分需要处理')
- #cli()
-
|