common_test.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import os, sys
  4. import uuid
  5. import time
  6. import simplejson as json
  7. from django.conf import settings
  8. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  9. sys.path.insert(0, PROJECT_ROOT)
  10. os.environ.update({"DJANGO_SETTINGS_MODULE": "configs.testing"})
  11. import django
  12. django.setup()
  13. from apps.web.core.networking import MessageSender
  14. from apps.web.core.mqtt_client import MqttClient
  15. from apps.web.device.models import Device
  16. IMEI = '867435053812616'
  17. server = '211.159.224.10'
  18. port = 1884
  19. event_received = False
  20. mqttc = MqttClient(client_id = 'webapp_' + str(uuid.uuid1()))
  21. def on_connect(client, userdata, flags, rc):
  22. # 订阅设备的120响应事件,用于检查
  23. device_topic = 'server/{}/120'.format(IMEI)
  24. client.subscribe(device_topic, qos = 0)
  25. device_topic = 'server/{}/110'.format(IMEI)
  26. client.subscribe(device_topic, qos = 0)
  27. device_topic = 'server/{}/100'.format(IMEI)
  28. client.subscribe(device_topic, qos = 0)
  29. device_topic = 'server/{}/301'.format(IMEI)
  30. client.subscribe(device_topic, qos = 0)
  31. device_topic = 'server/{}/307'.format(IMEI)
  32. client.subscribe(device_topic, qos = 0)
  33. def on_message(mqttc, obj, msg):
  34. msgDict = json.loads(bytes.decode(msg.payload))
  35. print('received event msg = {}'.format(msgDict))
  36. event_received = True
  37. mqttc.loop_stop()
  38. mqttc.disconnect()
  39. mqttc.close()
  40. try:
  41. mqttc.on_message = on_message
  42. mqttc.on_connect = on_connect
  43. mqttc.username_pw_set(settings.MQTT_USER, settings.MQTT_PSWD)
  44. mqttc.connect(server, port, 60)
  45. mqttc.loop_start()
  46. finally:
  47. pass
  48. CHECK_ROUNDS = 6
  49. PWM_WID = 80
  50. PWM_INTER = 600
  51. PWM_IDLE = 0
  52. UART_TIMEOUT = 20
  53. CYCLE_SET = 120
  54. DEBUG = True
  55. ENABLE_PG = True
  56. cmd_list = [
  57. {"cmd": 201},
  58. {"cmd": 201, "fields": ['soft_ver']},
  59. {"cmd": 201, "fields": ['signal']},
  60. # {"cmd":202, "addr_set":{"ip1":"211.159.224.10","port1":1883}},
  61. {"cmd": 202, "debug": DEBUG},
  62. {"cmd": 202, "cycle_set": CYCLE_SET},
  63. {"cmd": 202, "check_rounds": CHECK_ROUNDS},
  64. # {"cmd": 202, "keepalive_time": 320},
  65. {"cmd": 202, "coin_clear": True},
  66. {"cmd": 202, "clear_bls": True},
  67. # {"cmd": 202, "factory_set": True},
  68. # {"cmd": 202, "debounce": 25},
  69. # {"cmd": 202, "pg_inter": 62},
  70. {"cmd": 202, "enable_pg": ENABLE_PG},
  71. {"cmd": 202, "uart_timeout": UART_TIMEOUT},
  72. {"cmd": 202,
  73. "ntp_servers": ["ntp7.aliyun.com", "ntp6.aliyun.com", "ntp5.aliyun.com", "ntp4.aliyun.com", "ntp3.aliyun.com",
  74. "ntp2.aliyun.com", "ntp1.aliyun.com", "ntp.ntsc.ac.cn"]},
  75. {"cmd": 202, "pulse_set": {"pwm_wid": PWM_WID, "pwm_inter": PWM_INTER, "pwm_idle": PWM_IDLE}},
  76. {"cmd": 202, "max_coin": 255},
  77. # {"cmd": 202, "ota_set": {"fw_url": "http://www.washpayer.com/uploaded/1SMART_BOX_3.2.4_Luat_V0020_8955_SSL.bin"}},
  78. # {"cmd": 202, "driver_set": {"driver_url": "http://121.43.232.118/uploaded/version/drivers/task/100210_uart_charge_dianchuan_v1.5/default.driver"}},
  79. {"cmd": 203, "app_pay": 8},
  80. {"cmd": 203, "app_pay": 12},
  81. ]
  82. for cmd in cmd_list:
  83. payload = {'IMEI': IMEI}
  84. payload.update(cmd)
  85. dev = Device.get_dev(IMEI)
  86. result = MessageSender.send(dev, payload['cmd'], payload)
  87. if result['rst'] != 0:
  88. print 'error. cmd = {}'.format(str(cmd))
  89. exit(1)
  90. result = MessageSender.send(dev, 201, {'cmd': 201})
  91. if result['check_rounds'] != CHECK_ROUNDS:
  92. print("check_rounds update failure.")
  93. if result['pwm_wid'] != PWM_WID:
  94. print("pwm_wid update failure.")
  95. if result['cycle'] != CYCLE_SET:
  96. print("cycle_set update failure.")
  97. if result['uart_timeout'] != UART_TIMEOUT:
  98. print("uart_timeout update failure.")
  99. if result['pwm_inter'] != PWM_INTER:
  100. print("pwm_inter update failure.")
  101. if result['debug'] != DEBUG:
  102. print("debug update failure.")
  103. while True:
  104. if event_received == False:
  105. print('wait event to received.')
  106. time.sleep(5)
  107. continue
  108. else:
  109. exit(1)
  110. print 'finished'