# -*- coding: utf-8 -*- # !/usr/bin/env python import traceback import uuid import logging import threading from contextlib import closing from typing import Callable, Any, Dict, TYPE_CHECKING import simplejson as json from django.conf import settings from voluptuous import Schema, Required, ALLOW_EXTRA from apilib import utils_string, utils_datetime from apps.web.core.exceptions import MessageSenderException, MqttSubscribeError, MqttPublishError, MqttConnectError from apps.web.core.mqtt_client import MqttSendClientSync, MqttSendClientAsync from apps.web.constant import Const, DeviceCmdCode, MQTT_TIMEOUT, ErrorCode import django.dispatch import socket if TYPE_CHECKING: from apps.web.device.models import DeviceDict logger = logging.getLogger(__name__) post_operate_device = django.dispatch.Signal(providing_args = ['device', 'payload', 'operationResult']) def make_sid(): return '{}{}'.format(str(utils_datetime.generate_timestamp_ex()), utils_string.get_random_str(num = 1)) class MessageSender(object): @staticmethod def send_via_tcpip(host, port, message): # type:(str, int, dict)->dict payload = json.dumps(message) result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL} try: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s.settimeout(settings.MESSAGE_SENDER_TIMEOUT) s.connect((str(host), int(port))) s.sendall((payload + '\n').encode('utf-8')) result = json.loads(s.recv(2048)) logger.info( '[old device(devNo=%s)] request device manager success. result = %s' % (message['IMEI'], json.dumps(result))) except Exception, e: logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' % (message['IMEI'], e)) return result # 注意:目前服务器之间的消息发送,只支持一组{},不支持内嵌{}式的json。后续想支持更复杂的格式,需要在java设备管理器那边优化 @staticmethod def send_car_tcpip(host, port, message,timeout=settings.MESSAGE_SENDER_TIMEOUT): # type:(str, int, dict)->dict payload = json.dumps(message) result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_SUCCESS} try: with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.settimeout(timeout) s.connect((str(host), int(port))) s.sendall((payload + '\n').encode('utf-8')) info = s.recv(2048) if info: result = json.loads(info) else: logger.error('receive error msg,maybe the format of the msg error') return result logger.info( '[old device(devNo=%s)] request device manager success. result = %s' % (message['IMEI'], json.dumps(result))) except Exception, e: logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' % (str(message['IMEI']), str(e))) return {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL} return result @staticmethod def send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX, server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0): # type:(DeviceDict, int, dict, str, str, int, int)->dict host, port, protol = device.network_address logger.info( '[MessageSender:send] start to request device. device = {}; server = {}; cmd = {}; payload = {};'.format( repr(device), '{}:{}/{}'.format(host, port, protol), cmd, payload)) if protol == 'tcpip': payload.update({'cmd': int(cmd), 'IMEI': device.devNo}) result = MessageSender.send_via_tcpip(host = host, port = port, message = payload) logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format( repr(device), '{}:{}/{}'.format(host, port, protol), payload, str(result))) return result elif protol == 'car-tcp-ip': payload.update({'cmd': cmd, 'IMEI': device.devNo,'port':device.otherConf.get('devPort',8767)}) result = MessageSender.send_car_tcpip(host = host, port = port, message = payload) logger.info('[MessageSender:send_car_tcpip]device({}) via({}) message={} result={}'.format( repr(device), '{}:{}/{}'.format(host, port, protol), payload, str(result))) return result else: payload.update({'cmd': int(cmd), 'IMEI': device.devNo}) if device.support_sid_topic: with_sid_topic = True else: with_sid_topic = False result = SyncMessageSender( host = host, port = port, message = payload, device_topic_prefix = device_topic_prefix, server_topic_prefix = server_topic_prefix, with_sid_topic = with_sid_topic).send(timeout = timeout, retry = retry) logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format( repr(device), '{}:{}/{}'.format(host, port, protol), payload, str(result))) post_operate_device.send(sender = MessageSender, device = device, payload = payload, operationResult = result) return result @staticmethod def send_no_wait(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX, server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.SHORT): # type:(DeviceDict, int, Dict, str, str, int)->Dict cmd = int(cmd) host, port, proto = device.network_address logger.info( '[MessageSender:send_no_wait] start to request device. device = {}; server = {}; cmd = {}; payload = {}; device_topic_prefix = {}; server_topic_prefix = {}'.format( repr(device), '{}:{}/{}'.format(host, port, proto), cmd, payload, device_topic_prefix, server_topic_prefix)) payload.update({ 'cmd': cmd, 'IMEI': device.devNo }) SyncMessageSender(host = host, port = port, message = payload, device_topic_prefix = device_topic_prefix, server_topic_prefix = server_topic_prefix, with_sid_topic = device.support_sid_topic ).send_no_wait(timeout = timeout) @staticmethod def async_send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX, server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0, callback = None, **kwargs): # type:(DeviceDict, int, dict, str, str, int, int, callable, dict)->None class CallbackWrapper(object): def __init__(self, device, payload, callback): # type:(DeviceDict, dict, Callable)->None self.device = device self.callback = callback self.payload = payload def __call__(self, result, **kwargs): # type:(dict, dict)->None post_operate_device.send(sender = MessageSender, device = self.device, payload = self.payload, operationResult = result) if self.callback and isinstance(self.callback, Callable): self.callback(result, **kwargs) host, port, proto = device.network_address logger.debug( '[MessageSender:async_send] start to request device. device = {}; server = {}; cmd = {}; payload = {}; kwargs = {}'.format( repr(device), '{}:{}/{}'.format(host, port, proto), cmd, str(payload), str(kwargs))) payload.update({'IMEI': device.devNo, 'cmd': cmd}) SyncMessageSender( host = host, port = port, message = payload, device_topic_prefix = device_topic_prefix, server_topic_prefix = server_topic_prefix, with_sid_topic = device.support_sid_topic ).async_send(timeout = timeout, retry = retry, callback = CallbackWrapper(device = device, payload = payload, callback = callback), **kwargs) @staticmethod def send_for_chuangwei(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL): return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/skyworth', server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/skyworth', timeout = timeout) @staticmethod def send_for_moxiaozhiv2(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL): return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/mxzv2', server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/mxzv2', timeout = timeout) @staticmethod def net_pay(device, coins, timeout = MQTT_TIMEOUT.NORMAL, retry = 0, split = False, port = None): assert isinstance(device.devNo, basestring) and ((not port) or (type(port) == int and port > 0)) if type(coins) != int or coins <= 0: return { 'rst': ErrorCode.DEVICE_START_PACKAGE_ERROR } if not split: return MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = { 'app_pay': coins, 'IMEI': device.devNo } if not port else { 'app_pay': coins, 'IMEI': device.devNo, 'port': port }, timeout = timeout, retry = retry) # 规避设备投币数大于等于10的bug, 所以按照每次9个进行投币,最多不能超过9个 result = None for i in range(coins // 9): result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = { 'app_pay': 9, 'IMEI': device.devNo } if not port else { 'app_pay': 9, 'IMEI': device.devNo, 'port': port }, timeout = timeout, retry = retry) # 如果网络支付失败,直接返回,避免重复下发导致超时时间太大 if result['rst'] != 0: return result # 剩余投币数如果不为0,则一次下发 remainder = coins % 9 if remainder > 0: result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = { 'app_pay': remainder, 'IMEI': device.devNo } if not port else { 'app_pay': remainder, 'IMEI': device.devNo, 'port': port }, timeout = timeout, retry = retry) return result class SyncMessageSender(object): validMsgSchema = Schema( { Required('IMEI'): basestring, Required('cmd'): int }, extra = ALLOW_EXTRA) def __init__(self, host, port, message, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX, server_topic_prefix = Const.SERVER_TOPIC_PREFIX, with_sid_topic = False): # type: (str, int, dict, str, str, bool)->None self.host = str(host) self.port = int(port) self.sid = make_sid() self.with_sid_topic = with_sid_topic if self.with_sid_topic: self.sent = SyncMessageSender.validMsgSchema(message) self.smartbox_topic = '{}/{}/{}_{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']), self.sid) self.server_topic = '{}/{}/{}_{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']), self.sid) else: message.update( { 'sid': self.sid } ) self.sent = SyncMessageSender.validMsgSchema(message) self.smartbox_topic = '{}/{}/{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd'])) self.server_topic = '{}/{}/{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd'])) def __repr__(self): return 'SyncMessageSender'.format( self.host, str(self.port), self.sid, str(self.sent), self.server_topic, self.smartbox_topic) def send(self, timeout, retry, qos = 0): # type: (int, int, int)->dict mqttc = None try: logger.info( '[SyncMessageSender:send][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}; server_topic = {}'.format( self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic, self.server_topic)) mqttc = MqttSendClientSync(client_id = 'webapp_' + str(uuid.uuid4()), sid = self.sid, server_topic = self.server_topic, smart_box_topic = self.smartbox_topic, sent = self.sent, qos = qos) mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD) mqttc.connect(self.host, self.port) result = mqttc.send(timeout, retry) logger.debug( '[SyncMessageSender:send][{}] send message to device finished. result = {}'.format(self.sid, str(result))) return result except MqttSubscribeError as e: raise e except MqttPublishError as e: raise e except MqttConnectError as e: raise e except Exception as e: logger.error( '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format( self.sid, traceback.format_exc())) raise MessageSenderException(exception = traceback.format_exc()) finally: if mqttc: try: mqttc.disconnect() mqttc.close() except Exception as e: logger.exception(e) def send_no_wait(self, timeout): # type: (int)->dict mqttc = None try: mqttc = MqttSendClientAsync(client_id = 'webapp_' + str(uuid.uuid4()), sid = self.sid, smart_box_topic = self.smartbox_topic, sent = self.sent) logger.info( '[SyncMessageSender:send_no_wait][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}'.format( self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic)) mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD) mqttc.connect(self.host, self.port) mqttc.send(timeout) return True except MqttSubscribeError as e: logger.error('[SyncMessageSender:send_no_wait][{}] mqtt subscribe failure.'.format(self.sid)) return False except MqttPublishError as e: logger.error('[SyncMessageSender:send_no_wait][{}] mqtt publish failure.'.format(self.sid)) return False except Exception as e: logger.error( '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format( self.sid, traceback.format_exc())) return False finally: if mqttc: try: mqttc.disconnect() mqttc.close() except Exception as e: logger.exception(e) def async_send(self, timeout, retry, callback = None, **kwargs): # type: (int, int, Callable, dict)->None class AsyncThread(threading.Thread): def __init__(self, sender, timeout, retry, callback, **kwargs): # type: (SyncMessageSender, int, int, Callable, Dict)->None super(AsyncThread, self).__init__() self.sender = sender self.timeout = timeout self.retry = retry self.callback = callback self.kwargs = kwargs def run(self): try: result = self.sender.send(timeout, retry) except Exception as e: logger.exception(e) result = { 'cmd': self.sender.sent['cmd'], 'IMEI': self.sender.sent['IMEI'], 'rst': ErrorCode.EXCEPTION, 'exception': e } if self.callback and isinstance(self.callback, Callable): self.callback(result, **kwargs) t = AsyncThread(sender = self, timeout = timeout, retry = retry, callback = callback, **kwargs) t.start()