test_listener.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import os
  4. import sys
  5. import threading
  6. import time
  7. # from base import init_env
  8. #
  9. # os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.production")
  10. #
  11. # init_env(interactive = False)
  12. #
  13. # from apps.web.device.models import Group
  14. from gevent import socket
  15. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.testing')
  16. PROJECT_ROOT = os.path.join(os.path.abspath(os.path.split(os.path.realpath(__file__))[0] + "/.."), '..')
  17. sys.path.insert(0, PROJECT_ROOT)
  18. from script.base import init_env, get_logger
  19. init_env(interactive = False)
  20. logger = get_logger(__name__)
  21. from apps.web.common.event import do_device_event
  22. from apps.web.device.models import Device
  23. import simplejson as json
  24. # 启动2个线程,一个线程用于维持心跳,一个线程专门用于监听并处理事件。每分钟一次心跳,如果心跳失败,就重新建立连接。
  25. #
  26. server = '127.0.0.1'
  27. port = 8000
  28. def send_beat():
  29. global gclient
  30. while True:
  31. try:
  32. print('i have send heartbeat')
  33. strJson = json.dumps({"cmd":"I00","IMEI":"","data":"","sqNo":""})
  34. gclient.sendall(strJson)
  35. time.sleep(5)
  36. except Exception,e:
  37. logger.info('send heartbeat failed,e=%s' % e)
  38. try:
  39. gclient.close()
  40. except Exception,e:
  41. pass
  42. try:
  43. gclient = socket.socket()
  44. gclient.connect((server,port))
  45. except Exception,e:
  46. pass
  47. time.sleep(1)
  48. continue
  49. try:
  50. global gclient
  51. gclient = socket.socket() # 定义协议类型,相当于生命socket类型,同时生成socket连接对象
  52. gclient.connect((server,port))
  53. # 维持心跳的线程
  54. heartThread = threading.Thread(target = send_beat)
  55. heartThread.setDaemon(False)
  56. heartThread.start()
  57. while True:
  58. try:
  59. data = gclient.recv(1024)#这里是字节1k
  60. if not data:
  61. time.sleep(0.5)
  62. continue
  63. print("receive msg:",data.decode())
  64. payload = json.loads(data.decode())
  65. cmd = payload.get('cmd','')
  66. if not cmd :
  67. continue
  68. if cmd == 'I00':# I00是心跳,不需要处理
  69. logger.info('receive heartbeat from server!')
  70. continue
  71. # 启动一个事件处理线程处理事件,沿用以前旧的框架,这样业务和以前的开发流程
  72. dev = None
  73. devNo = payload.get('IMEI','')
  74. if devNo:
  75. dev = Device.get_dev(devNo = devNo)
  76. workerthread = threading.Thread(target = do_device_event,args=(cmd,dev,payload))
  77. workerthread.setDaemon(False)
  78. workerthread.start()
  79. except Exception,e:
  80. try:
  81. gclient.close()
  82. except Exception,e:
  83. pass
  84. try:
  85. gclient = socket.socket()
  86. gclient.connect((server,port))
  87. except Exception,e:
  88. pass
  89. time.sleep(1)
  90. continue
  91. gclient.close()
  92. except Exception as ex:
  93. print(ex)
  94. print 'I am OVER'