networking.py 18 KB


  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import traceback
  4. import uuid
  5. import logging
  6. import threading
  7. from contextlib import closing
  8. from typing import Callable, Any, Dict, TYPE_CHECKING
  9. import simplejson as json
  10. from django.conf import settings
  11. from voluptuous import Schema, Required, ALLOW_EXTRA
  12. from apilib import utils_string, utils_datetime
  13. from apps.web.core.exceptions import MessageSenderException, MqttSubscribeError, MqttPublishError, MqttConnectError
  14. from apps.web.core.mqtt_client import MqttSendClientSync, MqttSendClientAsync
  15. from apps.web.constant import Const, DeviceCmdCode, MQTT_TIMEOUT, ErrorCode
  16. import django.dispatch
  17. import socket
  18. if TYPE_CHECKING:
  19. from apps.web.device.models import DeviceDict
  20. logger = logging.getLogger(__name__)
  21. post_operate_device = django.dispatch.Signal(providing_args = ['device', 'payload', 'operationResult'])
  22. def make_sid():
  23. return '{}{}'.format(str(utils_datetime.generate_timestamp_ex()),
  24. utils_string.get_random_str(num = 1))
  25. class MessageSender(object):
  26. @staticmethod
  27. def send_via_tcpip(host, port, message):
  28. # type:(str, int, dict)->dict
  29. payload = json.dumps(message)
  30. result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL}
  31. try:
  32. with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
  33. s.settimeout(settings.MESSAGE_SENDER_TIMEOUT)
  34. s.connect((str(host), int(port)))
  35. s.sendall((payload + '\n').encode('utf-8'))
  36. result = json.loads(s.recv(2048))
  37. logger.info(
  38. '[old device(devNo=%s)] request device manager success. result = %s' %
  39. (message['IMEI'], json.dumps(result)))
  40. except Exception, e:
  41. logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' %
  42. (message['IMEI'], e))
  43. return result
  44. # 注意:目前服务器之间的消息发送,只支持一组{},不支持内嵌{}式的json。后续想支持更复杂的格式,需要在java设备管理器那边优化
  45. @staticmethod
  46. def send_car_tcpip(host, port, message,timeout=settings.MESSAGE_SENDER_TIMEOUT):
  47. # type:(str, int, dict)->dict
  48. payload = json.dumps(message)
  49. result = {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_SUCCESS}
  50. try:
  51. with closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as s:
  52. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  53. s.settimeout(timeout)
  54. s.connect((str(host), int(port)))
  55. s.sendall((payload + '\n').encode('utf-8'))
  56. info = s.recv(2048)
  57. if info:
  58. result = json.loads(info)
  59. else:
  60. logger.error('receive error msg,maybe the format of the msg error')
  61. return result
  62. logger.info(
  63. '[old device(devNo=%s)] request device manager success. result = %s' %
  64. (message['IMEI'], json.dumps(result)))
  65. except Exception, e:
  66. logger.exception('[old device(devNo=%s)] request device manager exception. exception = %s' %
  67. (str(message['IMEI']), str(e)))
  68. return {'cmd': message['cmd'], 'IMEI': str(message['IMEI']), 'rst': ErrorCode.DEVICE_CONN_FAIL}
  69. return result
  70. @staticmethod
  71. def send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
  72. server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0):
  73. # type:(DeviceDict, int, dict, str, str, int, int)->dict
  74. host, port, protol = device.network_address
  75. logger.info(
  76. '[MessageSender:send] start to request device. device = {}; server = {}; cmd = {}; payload = {};'.format(
  77. repr(device), '{}:{}/{}'.format(host, port, protol), cmd, payload))
  78. if protol == 'tcpip':
  79. payload.update({'cmd': int(cmd), 'IMEI': device.devNo})
  80. result = MessageSender.send_via_tcpip(host = host, port = port, message = payload)
  81. logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format(
  82. repr(device),
  83. '{}:{}/{}'.format(host, port, protol),
  84. payload,
  85. str(result)))
  86. return result
  87. elif protol == 'car-tcp-ip':
  88. payload.update({'cmd': cmd, 'IMEI': device.devNo,'port':device.otherConf.get('devPort',8767)})
  89. result = MessageSender.send_car_tcpip(host = host, port = port, message = payload)
  90. logger.info('[MessageSender:send_car_tcpip]device({}) via({}) message={} result={}'.format(
  91. repr(device),
  92. '{}:{}/{}'.format(host, port, protol),
  93. payload,
  94. str(result)))
  95. return result
  96. else:
  97. payload.update({'cmd': int(cmd), 'IMEI': device.devNo})
  98. if device.support_sid_topic:
  99. with_sid_topic = True
  100. else:
  101. with_sid_topic = False
  102. result = SyncMessageSender(
  103. host = host,
  104. port = port,
  105. message = payload,
  106. device_topic_prefix = device_topic_prefix,
  107. server_topic_prefix = server_topic_prefix,
  108. with_sid_topic = with_sid_topic).send(timeout = timeout,
  109. retry = retry)
  110. logger.info('[MessageSender:send]device({}) via({}) message={} result={}'.format(
  111. repr(device),
  112. '{}:{}/{}'.format(host, port, protol),
  113. payload,
  114. str(result)))
  115. post_operate_device.send(sender = MessageSender,
  116. device = device,
  117. payload = payload,
  118. operationResult = result)
  119. return result
  120. @staticmethod
  121. def send_no_wait(device, cmd, payload,
  122. device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
  123. server_topic_prefix = Const.SERVER_TOPIC_PREFIX,
  124. timeout = MQTT_TIMEOUT.SHORT):
  125. # type:(DeviceDict, int, Dict, str, str, int)->Dict
  126. cmd = int(cmd)
  127. host, port, proto = device.network_address
  128. logger.info(
  129. '[MessageSender:send_no_wait] start to request device. device = {}; server = {}; cmd = {}; payload = {}; device_topic_prefix = {}; server_topic_prefix = {}'.format(
  130. repr(device), '{}:{}/{}'.format(host, port, proto), cmd, payload, device_topic_prefix,
  131. server_topic_prefix))
  132. payload.update({
  133. 'cmd': cmd,
  134. 'IMEI': device.devNo
  135. })
  136. SyncMessageSender(host = host,
  137. port = port,
  138. message = payload,
  139. device_topic_prefix = device_topic_prefix,
  140. server_topic_prefix = server_topic_prefix,
  141. with_sid_topic = device.support_sid_topic
  142. ).send_no_wait(timeout = timeout)
  143. @staticmethod
  144. def async_send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
  145. server_topic_prefix = Const.SERVER_TOPIC_PREFIX, timeout = MQTT_TIMEOUT.NORMAL, retry = 0,
  146. callback = None, **kwargs):
  147. # type:(DeviceDict, int, dict, str, str, int, int, callable, dict)->None
  148. class CallbackWrapper(object):
  149. def __init__(self, device, payload, callback):
  150. # type:(DeviceDict, dict, Callable)->None
  151. self.device = device
  152. self.callback = callback
  153. self.payload = payload
  154. def __call__(self, result, **kwargs):
  155. # type:(dict, dict)->None
  156. post_operate_device.send(sender = MessageSender,
  157. device = self.device,
  158. payload = self.payload,
  159. operationResult = result)
  160. if self.callback and isinstance(self.callback, Callable):
  161. self.callback(result, **kwargs)
  162. host, port, proto = device.network_address
  163. logger.debug(
  164. '[MessageSender:async_send] start to request device. device = {}; server = {}; cmd = {}; payload = {}; kwargs = {}'.format(
  165. repr(device), '{}:{}/{}'.format(host, port, proto), cmd, str(payload), str(kwargs)))
  166. payload.update({'IMEI': device.devNo, 'cmd': cmd})
  167. SyncMessageSender(
  168. host = host, port = port, message = payload,
  169. device_topic_prefix = device_topic_prefix,
  170. server_topic_prefix = server_topic_prefix,
  171. with_sid_topic = device.support_sid_topic
  172. ).async_send(timeout = timeout, retry = retry,
  173. callback = CallbackWrapper(device = device, payload = payload, callback = callback), **kwargs)
  174. @staticmethod
  175. def send_for_chuangwei(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL):
  176. return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/skyworth',
  177. server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/skyworth', timeout = timeout)
  178. @staticmethod
  179. def send_for_moxiaozhiv2(device, cmd, payload, timeout = MQTT_TIMEOUT.NORMAL):
  180. return MessageSender.send(device, cmd, payload, device_topic_prefix = Const.DEVICE_TOPIC_PREFIX + '/mxzv2',
  181. server_topic_prefix = Const.SERVER_TOPIC_PREFIX + '/mxzv2', timeout = timeout)
  182. @staticmethod
  183. def net_pay(device, coins, timeout = MQTT_TIMEOUT.NORMAL, retry = 0, split = False, port = None):
  184. assert isinstance(device.devNo, basestring) and ((not port) or (type(port) == int and port > 0))
  185. if type(coins) != int or coins <= 0:
  186. return {
  187. 'rst': ErrorCode.DEVICE_START_PACKAGE_ERROR
  188. }
  189. if not split:
  190. return MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
  191. 'app_pay': coins,
  192. 'IMEI': device.devNo
  193. } if not port else
  194. {
  195. 'app_pay': coins,
  196. 'IMEI': device.devNo,
  197. 'port': port
  198. }, timeout = timeout, retry = retry)
  199. # 规避设备投币数大于等于10的bug, 所以按照每次9个进行投币,最多不能超过9个
  200. result = None
  201. for i in range(coins // 9):
  202. result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
  203. 'app_pay': 9,
  204. 'IMEI': device.devNo
  205. } if not port else
  206. {
  207. 'app_pay': 9,
  208. 'IMEI': device.devNo,
  209. 'port': port
  210. }, timeout = timeout, retry = retry)
  211. # 如果网络支付失败,直接返回,避免重复下发导致超时时间太大
  212. if result['rst'] != 0:
  213. return result
  214. # 剩余投币数如果不为0,则一次下发
  215. remainder = coins % 9
  216. if remainder > 0:
  217. result = MessageSender.send(device = device, cmd = DeviceCmdCode.PAY_MONEY, payload = {
  218. 'app_pay': remainder,
  219. 'IMEI': device.devNo
  220. } if not port else
  221. {
  222. 'app_pay': remainder,
  223. 'IMEI': device.devNo,
  224. 'port': port
  225. }, timeout = timeout, retry = retry)
  226. return result
  227. class SyncMessageSender(object):
  228. validMsgSchema = Schema(
  229. {
  230. Required('IMEI'): basestring,
  231. Required('cmd'): int
  232. }, extra = ALLOW_EXTRA)
  233. def __init__(self, host, port, message,
  234. device_topic_prefix = Const.DEVICE_TOPIC_PREFIX,
  235. server_topic_prefix = Const.SERVER_TOPIC_PREFIX,
  236. with_sid_topic = False):
  237. # type: (str, int, dict, str, str, bool)->None
  238. self.host = str(host)
  239. self.port = int(port)
  240. self.sid = make_sid()
  241. self.with_sid_topic = with_sid_topic
  242. if self.with_sid_topic:
  243. self.sent = SyncMessageSender.validMsgSchema(message)
  244. self.smartbox_topic = '{}/{}/{}_{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']),
  245. self.sid)
  246. self.server_topic = '{}/{}/{}_{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']),
  247. self.sid)
  248. else:
  249. message.update(
  250. {
  251. 'sid': self.sid
  252. }
  253. )
  254. self.sent = SyncMessageSender.validMsgSchema(message)
  255. self.smartbox_topic = '{}/{}/{}'.format(device_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']))
  256. self.server_topic = '{}/{}/{}'.format(server_topic_prefix, self.sent['IMEI'], str(self.sent['cmd']))
  257. def __repr__(self):
  258. return 'SyncMessageSender<host = {}; port = {}; sid = {}; message = {}; server topic = {}; smartbox topic = {}>'.format(
  259. self.host, str(self.port), self.sid, str(self.sent), self.server_topic, self.smartbox_topic)
  260. def send(self, timeout, retry, qos = 0):
  261. # type: (int, int, int)->dict
  262. mqttc = None
  263. try:
  264. logger.info(
  265. '[SyncMessageSender:send][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}; server_topic = {}'.format(
  266. self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic, self.server_topic))
  267. mqttc = MqttSendClientSync(client_id = 'webapp_' + str(uuid.uuid4()),
  268. sid = self.sid,
  269. server_topic = self.server_topic,
  270. smart_box_topic = self.smartbox_topic,
  271. sent = self.sent,
  272. qos = qos)
  273. mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD)
  274. mqttc.connect(self.host, self.port)
  275. result = mqttc.send(timeout, retry)
  276. logger.debug(
  277. '[SyncMessageSender:send][{}] send message to device finished. result = {}'.format(self.sid,
  278. str(result)))
  279. return result
  280. except MqttSubscribeError as e:
  281. raise e
  282. except MqttPublishError as e:
  283. raise e
  284. except MqttConnectError as e:
  285. raise e
  286. except Exception as e:
  287. logger.error(
  288. '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format(
  289. self.sid,
  290. traceback.format_exc()))
  291. raise MessageSenderException(exception = traceback.format_exc())
  292. finally:
  293. if mqttc:
  294. try:
  295. mqttc.disconnect()
  296. mqttc.close()
  297. except Exception as e:
  298. logger.exception(e)
  299. def send_no_wait(self, timeout):
  300. # type: (int)->dict
  301. mqttc = None
  302. try:
  303. mqttc = MqttSendClientAsync(client_id = 'webapp_' + str(uuid.uuid4()),
  304. sid = self.sid,
  305. smart_box_topic = self.smartbox_topic,
  306. sent = self.sent)
  307. logger.info(
  308. '[SyncMessageSender:send_no_wait][{}] send message to device; host = {}; port = {}; message = {}; smartbox_topic = {}'.format(
  309. self.sid, self.host, str(self.port), str(self.sent), self.smartbox_topic))
  310. mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD)
  311. mqttc.connect(self.host, self.port)
  312. mqttc.send(timeout)
  313. return True
  314. except MqttSubscribeError as e:
  315. logger.error('[SyncMessageSender:send_no_wait][{}] mqtt subscribe failure.'.format(self.sid))
  316. return False
  317. except MqttPublishError as e:
  318. logger.error('[SyncMessageSender:send_no_wait][{}] mqtt publish failure.'.format(self.sid))
  319. return False
  320. except Exception as e:
  321. logger.error(
  322. '[SyncMessageSender:send][{}] send message to device finished. exception = {}'.format(
  323. self.sid,
  324. traceback.format_exc()))
  325. return False
  326. finally:
  327. if mqttc:
  328. try:
  329. mqttc.disconnect()
  330. mqttc.close()
  331. except Exception as e:
  332. logger.exception(e)
  333. def async_send(self, timeout, retry, callback = None, **kwargs):
  334. # type: (int, int, Callable, dict)->None
  335. class AsyncThread(threading.Thread):
  336. def __init__(self, sender, timeout, retry, callback, **kwargs):
  337. # type: (SyncMessageSender, int, int, Callable, Dict)->None
  338. super(AsyncThread, self).__init__()
  339. self.sender = sender
  340. self.timeout = timeout
  341. self.retry = retry
  342. self.callback = callback
  343. self.kwargs = kwargs
  344. def run(self):
  345. try:
  346. result = self.sender.send(timeout, retry)
  347. except Exception as e:
  348. logger.exception(e)
  349. result = {
  350. 'cmd': self.sender.sent['cmd'],
  351. 'IMEI': self.sender.sent['IMEI'],
  352. 'rst': ErrorCode.EXCEPTION,
  353. 'exception': e
  354. }
  355. if self.callback and isinstance(self.callback, Callable):
  356. self.callback(result, **kwargs)
  357. t = AsyncThread(sender = self, timeout = timeout, retry = retry, callback = callback, **kwargs)
  358. t.start()