123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import logging
- import datetime
- import struct
- from ctypes import create_string_buffer
- from apilib.utils import xor_encrypt_buffer
- from apps.web.core.exceptions import ServiceException
- from .utils import pollDeviceInfo
- logger = logging.getLogger(__name__)
- class BtSmartBox(object):
- def __init__(self, device):
- self._device = device
- super(BtSmartBox, self).__init__()
- def support_count_down(self):
- return False
- def encode_cmd(self, cmdId, **kwargs):
- pass
- def decode_cmd(self, cmdId, command):
- pass
- def poll_notify(self, payload, **kwargs):
- return 1, ''
- def encrypt(self, buffer, offset = 4):
- xor_encrypt_buffer(buffer, offset, self._device['devNo'])
- for index in range(offset, offset + (20 - offset + 1) / 2):
- swap_index = 20 + offset - 1 - index
- left = ord(buffer.raw[index])
- right = ord(buffer.raw[swap_index])
- struct.pack_into('<B', buffer, index, right)
- struct.pack_into('<B', buffer, swap_index, left)
- return buffer
- def decrypt(self, buffer, offset = 4):
- for index in range(offset, offset + (20 - offset + 1) / 2):
- swap_index = 20 + offset - 1 - index
- left = ord(buffer.raw[index])
- right = ord(buffer.raw[swap_index])
- struct.pack_into('<B', buffer, index, right)
- struct.pack_into('<B', buffer, swap_index, left)
- return xor_encrypt_buffer(buffer, offset, self._device['devNo'])
- class BtChargingSocket(BtSmartBox):
- def __init__(self, device):
- super(BtChargingSocket, self).__init__(device)
- def poll_notify(self, payload, **kwargs):
- return 1, ''
- def encode_cmd(self, cmdId, **kwargs):
- package = kwargs['attachParas']['package']
- if cmdId == 1:
- time = int(package['time'])
- buf = create_string_buffer(20)
- struct.pack_into('<BH', buf, 0, 1, time)
- return ''.join('{:02x}'.format(ord(c)) for c in buf.raw)
- class BtPulseDevice(BtSmartBox):
- def __init__(self, device):
- super(BtPulseDevice, self).__init__(device)
- """
- 脉冲设备支持命令
- cmdId = 1, 编码格式为 seqNo: 4, cmdId: 1, pulse: 2, pulseWidth: 2, pulseInterval: 2, battery: 1, year: 2, month: 1, day: 1
- """
- def encode_cmd(self, cmdId, **kwargs):
- if cmdId == 1:
- nowTime = datetime.datetime.now()
- year = int(nowTime.year)
- month = int(nowTime.month)
- day = int(nowTime.day)
- package = kwargs['attachParas']['package']
- pulse = int(package['coins'])
- buf = create_string_buffer(20)
- if self._device['major'] > 1 or self._device['minor'] > 1: # 从第二个版本开始支持加密功能
- if 'chargeIndex' in kwargs['attachParas'] and kwargs['attachParas']['chargeIndex'] and \
- kwargs['attachParas']['chargeIndex'] != 'undefined':
- s = struct.Struct('<IBHHHBHBBc')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId, pulse,
- int(self._device['pulseWidth1']),
- int(self._device['pulseInterval1']),
- int(self._device['battery']),
- year, month, day, kwargs['attachParas']['chargeIndex'])
- else:
- s = struct.Struct('<IBHHHBHBB')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId, pulse,
- int(self._device['pulseWidth1']),
- int(self._device['pulseInterval1']),
- int(self._device['battery']),
- year, month, day)
- if self._device['major'] > 2 or (self._device['major'] == 2 and self._device['minor'] >= 2):
- buf = self.encrypt(buf, 5)
- else:
- buf = self.encrypt(buf)
- else:
- s = struct.Struct('<BHHHHBB')
- s.pack_into(buf, 0, cmdId, pulse, int(self._device['pulseWidth1']),
- int(self._device['pulseInterval1']), year, month, day)
- return ''.join('{:02x}'.format(ord(c)) for c in buf.raw)
- else:
- raise Exception(u'不支持的命令')
- def poll_notify(self, payload, **kwargs):
- try:
- if len(payload) % 2 != 0:
- raise ServiceException({'result': 0, 'description': u'参数不正确'})
- buf = create_string_buffer(len(payload) / 2)
- index = 0
- step = 0
- token = '0x'
- for c in payload:
- if index != 0 and index % 2 == 0:
- struct.pack_into('<B', buf, step, int(token, 16))
- token = '0x'
- step += 1
- token += c
- index += 1
- year, month, day, coins = struct.unpack_from('<HBBL', buf, 0)
- pollDeviceInfo(self._device, kwargs['major'], kwargs['minor'], year, month, day, coins)
- except Exception, e:
- logger.exception(e)
- return 1, ''
- class BtSoftSwitch(BtSmartBox):
- def __init__(self, device):
- super(BtSoftSwitch, self).__init__(device)
- def encode_cmd(self, cmdId, **kwargs):
- if cmdId == 1:
- nowTime = datetime.datetime.now()
- year = int(nowTime.year)
- month = int(nowTime.month)
- day = int(nowTime.day)
- package = kwargs['attachParas']['package']
- time = long(package['time'])
- unit = package.get('unit', u'分钟')
- if unit == u'分钟':
- time = int(time * 60)
- elif unit == u'小时':
- time = int(time * 3600)
- elif unit == u'天':
- time = int(time * 24 * 3600)
- buf = create_string_buffer(20)
- s = struct.Struct('<IBHBBIB')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId, year, month, day, time, 1)
- buf = self.encrypt(buf)
- return ''.join('{:02x}'.format(ord(c)) for c in buf.raw)
- elif cmdId == 2:
- if self._device['major'] > 1 or self._device['minor'] > 1: # 从第二个版本开始支持确认功能
- buf = create_string_buffer(20)
- s = struct.Struct('<IB')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId)
- return self.encrypt(buf)
- raise ServiceException({'result': 0, 'description': u'参数不正确'})
- class BtTimeSwitchIR(BtSmartBox):
- class HOT(object):
- NO_HEAT = 0x0
- HEAT = 0x08
- NOT_ASSIGNED = 0x0
- class STRENGTH(object):
- STRENGTH_1 = 0x0
- STRENGTH_2 = 0x10
- STRENGTH_3 = 0x20
- STRENGTH_4 = 0x30
- NOT_ASSIGNED = 0x0
- class MODE(object):
- MODE_1 = 0x80
- MODE_2 = 0x40
- MODE_3 = 0xC0
- NOT_ASSIGNED = 0x0
- def __init__(self, device):
- super(BtTimeSwitchIR, self).__init__(device)
- def support_count_down(self):
- return True
- def encode_cmd(self, cmdId, **kwargs):
- if cmdId == 1:
- now_time = datetime.datetime.now()
- year = int(now_time.year)
- month = int(now_time.month)
- day = int(now_time.day)
- package = kwargs['attachParas']['package']
- time = long(package['time'])
- unit = package.get('unit', u'分钟')
- if unit == u'分钟':
- time = int(time * 60)
- elif unit == u'小时':
- time = int(time * 3600)
- elif unit == u'天':
- time = int(time * 24 * 3600)
- mode = BtTimeSwitchIR.MODE.MODE_1
- strength = BtTimeSwitchIR.STRENGTH.STRENGTH_1
- is_hot = BtTimeSwitchIR.HOT.NO_HEAT
- attachParas = kwargs['attachParas']
- if 'isHot' in attachParas:
- is_hot = attachParas['isHot']
- if 'strength' in attachParas:
- strength = attachParas['strength']
- if 'mode' in attachParas:
- mode = attachParas['mode']
- buf = create_string_buffer(20)
- s = struct.Struct('<IBHBBIBBBB')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId, year, month, day, time, mode, strength, is_hot, 0)
- buf = self.encrypt(buf, 5)
- return ''.join('{:02x}'.format(ord(c)) for c in buf.raw)
- raise ServiceException({'result': 0, 'description': u'参数不正确'})
- class BtClothesTree(BtSmartBox):
- class MODE(object):
- MODE_COLD = 0x01
- MODE_HOT = 0x02
- def __init__(self, device):
- super(BtClothesTree, self).__init__(device)
- def support_count_down(self):
- return False
- def encode_cmd(self, cmdId, **kwargs):
- if cmdId == 1:
- now_time = datetime.datetime.now()
- year = int(now_time.year)
- month = int(now_time.month)
- day = int(now_time.day)
- package = kwargs['attachParas']['package']
- left_time = long(package['time'])
- unit = package.get('unit', u'分钟')
- if unit == u'分钟':
- left_time = int(left_time * 60)
- elif unit == u'小时':
- left_time = int(left_time * 3600)
- elif unit == u'天':
- left_time = int(left_time * 24 * 3600)
- mode = BtClothesTree.MODE.MODE_HOT
- attachParas = kwargs['attachParas']
- if 'mode' in attachParas:
- mode = attachParas['mode']
- is_cut_memory = 0
- buf = create_string_buffer(20)
- s = struct.Struct('<IBHBBIBB')
- s.pack_into(buf, 0,
- int(kwargs['seqNo']), cmdId, year, month, day, left_time, mode, is_cut_memory)
- buf = self.encrypt(buf, 5)
- return ''.join('{:02x}'.format(ord(c)) for c in buf.raw)
- raise ServiceException({'result': 0, 'description': u'参数不正确'})
|