123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import traceback
- import uuid
- import logging
- import threading
- from contextlib import closing
- from typing import Callable, Any, Dict, TYPE_CHECKING
- import simplejson as json
- from django.conf import settings
- from voluptuous import Schema, Required, ALLOW_EXTRA
- from apilib import utils_string, utils_datetime
- from apps.web.core.exceptions import MessageSenderException, MqttSubscribeError, MqttPublishError, MqttConnectError
- from apps.web.core.mqtt_client import MqttSendClientSync, MqttSendClientAsync
- from apps.web.constant import Const, DeviceCmdCode, MQTT_TIMEOUT, ErrorCode
- import django.dispatch
- import socket
- if TYPE_CHECKING:
- from apps.web.device.models import DeviceDict
- logger = logging.getLogger(__name__)
- post_operate_device = django.dispatch.Signal(providing_args = ['device', 'payload', 'operationResult'])
- def make_sid():
- return '{}{}'.format(str(utils_datetime.generate_timestamp_ex()),
- utils_string.get_random_str(num = 1))
- class MessageSender(object):
- @staticmethod
- def send_via_tcpip(host, port, message):
- # type:(str, int, dict)->dict
- payload = json.dumps(message)
- result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL}
- try:
- with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
- s.settimeout(settings.MESSAGE_SENDER_TIMEOUT)
- s.connect((str(host), int(port)))
- s.sendall((payload + '\n').encode('utf-8'))
- result = json.loads(s.recv(2048))
- logger.info(
- '[old device(devNo=%s)] request device manager success. result = %s' %
- (message['IMEI'], json.dumps(result)))
- except Exception, e:
- logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' %
- (message['IMEI'], e))
- return result
-
- # 注意:目前服务器之间的消息发送,只支持一组{},不支持内嵌{}式的json。后续想支持更复杂的格式,需要在java设备管理器那边优化
- @staticmethod
- def send_car_tcpip(host, port, message,timeout=settings.MESSAGE_SENDER_TIMEOUT):
- # type:(str, int, dict)->dict
- payload = json.dumps(message)
- result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_SUCCESS}
- try:
- with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- s.settimeout(timeout)
- s.connect((str(host), int(port)))
- s.sendall((payload + '\n').encode('utf-8'))
- info = s.recv(2048)
- if info:
- result = json.loads(info)
- else:
- logger.error('receive error msg,maybe the format of the msg error')
- return result
- logger.info(
- '[old device(devNo=%s)] request device manager success. result = %s' %
- (message['IMEI'], json.dumps(result)))
-
- except Exception, e:
- logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' %
- (str(message['IMEI']), str(e)))
- return {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL}
-
- return result
-
- @staticmethod
- def send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0):
- # type:(DeviceDict, int, dict, str, str, int, int)->dict
- host, port, protol = device.network_address
- logger.info(
- '[MessageSender:send] start to request device. device = {}; server = {}; cmd = {}; payload = {};'.format(
- repr(device), '{}:{}/{}'.format(host, port, protol), cmd, payload))
- if protol == 'tcpip':
- payload.update({'cmd': int(cmd), 'IMEI': device.devNo})
-
- result = MessageSender.send_via_tcpip(host = host, port = port, message = payload)
- logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format(
- repr(device),
- '{}:{}/{}'.format(host, port, protol),
- payload,
- str(result)))
- return result
- elif protol == 'car-tcp-ip':
- payload.update({'cmd': cmd, 'IMEI': device.devNo,'port':device.otherConf.get('devPort',8767)})
- result = MessageSender.send_car_tcpip(host = host, port = port, message = payload)
- logger.info('[MessageSender:send_car_tcpip]device({}) via({}) message={} result={}'.format(
- repr(device),
- '{}:{}/{}'.format(host, port, protol),
- payload,
- str(result)))
- return result
- else:
- payload.update({'cmd': int(cmd), 'IMEI': device.devNo})
-
- if device.support_sid_topic:
- with_sid_topic = True
- else:
- with_sid_topic = False
- result = SyncMessageSender(
- host = host,
- port = port,
- message = payload,
- device_topic_prefix = device_topic_prefix,
- server_topic_prefix = server_topic_prefix,
- with_sid_topic = with_sid_topic).send(timeout = timeout,
- retry = retry)
- logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format(
- repr(device),
- '{}:{}/{}'.format(host, port, protol),
- payload,
- str(result)))
- post_operate_device.send(sender = MessageSender,
- device = device,
- payload = payload,
- operationResult = result)
- return result
- @staticmethod
- def send_no_wait(device, cmd, payload,
- device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX,
- timeout = MQTT_TIMEOUT.SHORT):
- # type:(DeviceDict, int, Dict, str, str, int)->Dict
- cmd = int(cmd)
- host, port, proto = device.network_address
- logger.info(
- '[MessageSender:send_no_wait] start to request device. device = {}; server = {}; cmd = {}; payload = {}; device_topic_prefix = {}; server_topic_prefix = {}'.format(
- repr(device), '{}:{}/{}'.format(host, port, proto), cmd, payload, device_topic_prefix,
- server_topic_prefix))
- payload.update({
- 'cmd': cmd,
- 'IMEI': device.devNo
- })
- SyncMessageSender(host = host,
- port = port,
- message = payload,
- device_topic_prefix = device_topic_prefix,
- server_topic_prefix = server_topic_prefix,
- with_sid_topic = device.support_sid_topic
- ).send_no_wait(timeout = timeout)
- @staticmethod
- def async_send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0,
- callback = None, **kwargs):
- # type:(DeviceDict, int, dict, str, str, int, int, callable, dict)->None
- class CallbackWrapper(object):
- def __init__(self, device, payload, callback):
- # type:(DeviceDict, dict, Callable)->None
- self.device = device
- self.callback = callback
- self.payload = payload
- def __call__(self, result, **kwargs):
- # type:(dict, dict)->None
- post_operate_device.send(sender = MessageSender,
- device = self.device,
- payload = self.payload,
- operationResult = result)
- if self.callback and isinstance(self.callback, Callable):
- self.callback(result, **kwargs)
- host, port, proto = device.network_address
- logger.debug(
- '[MessageSender:async_send] start to request device. device = {}; server = {}; cmd = {}; payload = {}; kwargs = {}'.format(
- repr(device), '{}:{}/{}'.format(host, port, proto), cmd, str(payload), str(kwargs)))
- payload.update({'IMEI': device.devNo, 'cmd': cmd})
- SyncMessageSender(
- host = host, port = port, message = payload,
- device_topic_prefix = device_topic_prefix,
- server_topic_prefix = server_topic_prefix,
- with_sid_topic = device.support_sid_topic
- ).async_send(timeout = timeout, retry = retry,
- callback = CallbackWrapper(device = device, payload = payload, callback = callback), **kwargs)
- @staticmethod
- def send_for_chuangwei(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL):
- return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/skyworth',
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/skyworth', timeout = timeout)
- @staticmethod
- def send_for_moxiaozhiv2(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL):
- return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/mxzv2',
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/mxzv2', timeout = timeout)
- @staticmethod
- def net_pay(device, coins, timeout = MQTT_TIMEOUT.NORMAL, retry = 0, split = False, port = None):
- assert isinstance(device.devNo, basestring) and ((not port) or (type(port) == int and port > 0))
- if type(coins) != int or coins <= 0:
- return {
- 'rst': ErrorCode.DEVICE_START_PACKAGE_ERROR
- }
- if not split:
- return MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
- 'app_pay': coins,
- 'IMEI': device.devNo
- } if not port else
- {
- 'app_pay': coins,
- 'IMEI': device.devNo,
- 'port': port
- }, timeout = timeout, retry = retry)
- # 规避设备投币数大于等于10的bug, 所以按照每次9个进行投币,最多不能超过9个
- result = None
- for i in range(coins // 9):
- result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
- 'app_pay': 9,
- 'IMEI': device.devNo
- } if not port else
- {
- 'app_pay': 9,
- 'IMEI': device.devNo,
- 'port': port
- }, timeout = timeout, retry = retry)
- # 如果网络支付失败,直接返回,避免重复下发导致超时时间太大
- if result['rst'] != 0:
- return result
- # 剩余投币数如果不为0,则一次下发
- remainder = coins % 9
- if remainder > 0:
- result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
- 'app_pay': remainder,
- 'IMEI': device.devNo
- } if not port else
- {
- 'app_pay': remainder,
- 'IMEI': device.devNo,
- 'port': port
- }, timeout = timeout, retry = retry)
- return result
- class SyncMessageSender(object):
- validMsgSchema = Schema(
- {
- Required('IMEI'): basestring,
- Required('cmd'): int
- }, extra = ALLOW_EXTRA)
- def __init__(self, host, port, message,
- device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
- server_topic_prefix = Const.SERVER_TOPIC_PREFIX,
- with_sid_topic = False):
- # type: (str, int, dict, str, str, bool)->None
- self.host = str(host)
- self.port = int(port)
- self.sid = make_sid()
- self.with_sid_topic = with_sid_topic
- if self.with_sid_topic:
- self.sent = SyncMessageSender.validMsgSchema(message)
- self.smartbox_topic = '{}/{}/{}_{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']),
- self.sid)
- self.server_topic = '{}/{}/{}_{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']),
- self.sid)
- else:
- message.update(
- {
- 'sid': self.sid
- }
- )
- self.sent = SyncMessageSender.validMsgSchema(message)
- self.smartbox_topic = '{}/{}/{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']))
- self.server_topic = '{}/{}/{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']))
- def __repr__(self):
- return 'SyncMessageSender<host = {}; port = {}; sid = {}; message = {}; server topic = {}; smartbox topic = {}>'.format(
- self.host, str(self.port), self.sid, str(self.sent), self.server_topic, self.smartbox_topic)
- def send(self, timeout, retry, qos = 0):
- # type: (int, int, int)->dict
- mqttc = None
- try:
- logger.info(
- '[SyncMessageSender:send][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}; server_topic = {}'.format(
- self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic, self.server_topic))
- mqttc = MqttSendClientSync(client_id = 'webapp_' + str(uuid.uuid4()),
- sid = self.sid,
- server_topic = self.server_topic,
- smart_box_topic = self.smartbox_topic,
- sent = self.sent,
- qos = qos)
- mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD)
- mqttc.connect(self.host, self.port)
- result = mqttc.send(timeout, retry)
- logger.debug(
- '[SyncMessageSender:send][{}] send message to device finished. result = {}'.format(self.sid,
- str(result)))
- return result
- except MqttSubscribeError as e:
- raise e
- except MqttPublishError as e:
- raise e
- except MqttConnectError as e:
- raise e
- except Exception as e:
- logger.error(
- '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format(
- self.sid,
- traceback.format_exc()))
- raise MessageSenderException(exception = traceback.format_exc())
- finally:
- if mqttc:
- try:
- mqttc.disconnect()
- mqttc.close()
- except Exception as e:
- logger.exception(e)
- def send_no_wait(self, timeout):
- # type: (int)->dict
- mqttc = None
- try:
- mqttc = MqttSendClientAsync(client_id = 'webapp_' + str(uuid.uuid4()),
- sid = self.sid,
- smart_box_topic = self.smartbox_topic,
- sent = self.sent)
- logger.info(
- '[SyncMessageSender:send_no_wait][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}'.format(
- self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic))
- mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD)
- mqttc.connect(self.host, self.port)
- mqttc.send(timeout)
- return True
- except MqttSubscribeError as e:
- logger.error('[SyncMessageSender:send_no_wait][{}] mqtt subscribe failure.'.format(self.sid))
- return False
- except MqttPublishError as e:
- logger.error('[SyncMessageSender:send_no_wait][{}] mqtt publish failure.'.format(self.sid))
- return False
- except Exception as e:
- logger.error(
- '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format(
- self.sid,
- traceback.format_exc()))
- return False
- finally:
- if mqttc:
- try:
- mqttc.disconnect()
- mqttc.close()
- except Exception as e:
- logger.exception(e)
- def async_send(self, timeout, retry, callback = None, **kwargs):
- # type: (int, int, Callable, dict)->None
- class AsyncThread(threading.Thread):
- def __init__(self, sender, timeout, retry, callback, **kwargs):
- # type: (SyncMessageSender, int, int, Callable, Dict)->None
- super(AsyncThread, self).__init__()
- self.sender = sender
- self.timeout = timeout
- self.retry = retry
- self.callback = callback
- self.kwargs = kwargs
- def run(self):
- try:
- result = self.sender.send(timeout, retry)
- except Exception as e:
- logger.exception(e)
- result = {
- 'cmd': self.sender.sent['cmd'],
- 'IMEI': self.sender.sent['IMEI'],
- 'rst': ErrorCode.EXCEPTION,
- 'exception': e
- }
- if self.callback and isinstance(self.callback, Callable):
- self.callback(result, **kwargs)
- t = AsyncThread(sender = self, timeout = timeout, retry = retry, callback = callback, **kwargs)
- t.start()
|