test_api.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # -*- coding: utf-8 -*-
  2. # !/usr/bin/env python
  3. import base64
  4. import os
  5. import time
  6. import uuid
  7. import requests
  8. from django.conf import settings
  9. os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.production")
  10. from base import init_env, md5
  11. init_env(interactive = False)
  12. from apps.web.agent.models import Agent
  13. from apps.web.dealer.models import Dealer
  14. # TEST_DEALER_ID = '5b9ae99ad89a177846459999'
  15. TEST_DEALER_ID = '5d2c23090030483301d8914d'
  16. class ApiSender(object):
  17. # PAY_HOST = 'http://211.159.224.10'
  18. PAY_HOST = 'http://www.washpayer.com'
  19. def __init__(self, dealer_id, password):
  20. dealer = Dealer.objects(id = dealer_id).get()
  21. agent = Agent.objects(id = str(dealer.agentId)).get()
  22. self.test_case = {
  23. 'dealerId': dealer_id,
  24. 'username': str(dealer.username),
  25. 'password': password,
  26. 'agentId': dealer.agentId,
  27. 'domain': agent.domain,
  28. 'agentSign': agent.agentSign,
  29. 'mySign': agent.mySign
  30. }
  31. def get(self, endpoint, data = None, **kwargs):
  32. return self._request(
  33. method = 'get',
  34. endpoint = endpoint,
  35. data = data,
  36. **kwargs)
  37. def post(self, endpoint, data = None, **kwargs):
  38. return self._request(
  39. method = 'post',
  40. endpoint = endpoint,
  41. data = data,
  42. **kwargs
  43. )
  44. def _decode_result(self, res):
  45. try:
  46. import simplejson as json
  47. result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False)
  48. except (TypeError, ValueError):
  49. return res
  50. return result
  51. def _request(self, method, endpoint, data = None, headers = None, **kwargs):
  52. url = '{base}{endpoint}'.format(
  53. base = self.PAY_HOST,
  54. endpoint = endpoint
  55. )
  56. headers = {
  57. 'Content-Type': 'application/json',
  58. 'authorization': 'Basic {}'.format(
  59. base64.b64encode('{}:{}'.format(self.test_case['username'], md5(self.test_case['password'])))),
  60. 'sign': self.test_case['agentSign'],
  61. 'User-Agent':'MI 6X Build'
  62. }
  63. if data:
  64. data.update({'sign': self.test_case['agentSign']})
  65. import simplejson as json
  66. body = json.dumps(data, ensure_ascii = False)
  67. body = body.encode('utf-8')
  68. kwargs['data'] = body
  69. kwargs['timeout'] = kwargs.get('timeout', 15)
  70. with requests.sessions.Session() as session:
  71. res = session.request(
  72. url = url,
  73. method = method,
  74. headers = headers,
  75. **kwargs)
  76. try:
  77. res.raise_for_status()
  78. except requests.RequestException as reqe:
  79. return {
  80. 'errcode': 'EXCEPTION',
  81. 'errmsg': str(reqe)
  82. }
  83. return self._decode_result(res)
  84. def test_online():
  85. data = {
  86. 'deviceCode': 'test_zhiyangji_4g'
  87. }
  88. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  89. endpoint = '/api/v1/device/status/online', data = data
  90. )
  91. print(res)
  92. def test_start_device_old():
  93. data = {
  94. 'package': '1',
  95. 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
  96. 'attachParas':
  97. {
  98. 'createTime': '2019-11-27 16:47:12',
  99. 'coins': '1',
  100. 'extOrderNo': str(uuid.uuid4()),
  101. 'devNo': '860344049185789',
  102. 'domain': '47.108.67.205:8080',
  103. 'channel': '123456',
  104. 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
  105. 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
  106. 'logicalCode': 'test_zhiyangji_4g'
  107. },
  108. 'deviceCode': 'test_zhiyangji_4g'}
  109. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  110. endpoint = '/api/v1/device/start', data = data
  111. )
  112. print(res)
  113. def test_start_device_old2():
  114. data = {
  115. 'package': {
  116. 'unit': u'分钟',
  117. 'time': 5
  118. },
  119. 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
  120. 'attachParas':
  121. {
  122. 'createTime': '2019-11-27 16:47:12',
  123. 'coins': '1',
  124. 'extOrderNo': str(uuid.uuid4()),
  125. 'devNo': '860344049185789',
  126. 'domain': '47.108.67.205:8080',
  127. 'channel': '123456',
  128. 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
  129. 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
  130. 'logicalCode': 'test_zhiyangji_4g'
  131. },
  132. 'deviceCode': 'test_zhiyangji_4g'}
  133. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  134. endpoint = '/api/v1/device/start', data = data
  135. )
  136. print(res)
  137. def test_start_device_new():
  138. data = {
  139. 'package': '1',
  140. # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
  141. 'createTime': '2019-11-27 16:47:12',
  142. 'extOrderNo': str(uuid.uuid4()),
  143. 'channel': '123456',
  144. 'attachParas': {},
  145. 'deviceCode': 'test_zhiyangji'}
  146. res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post(
  147. endpoint = '/api/v1/device/start', data = data
  148. )
  149. print(res)
  150. def test_start_device_new2():
  151. data = {
  152. 'package': {
  153. 'coins': 3,
  154. 'time': 5
  155. },
  156. # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
  157. 'createTime': '2019-11-27 16:47:12',
  158. 'extOrderNo': str(uuid.uuid4()),
  159. 'channel': '123456',
  160. 'attachParas': {},
  161. 'deviceCode': 'test_zhiyangji_4g'}
  162. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  163. endpoint = '/api/v1/device/start', data = data
  164. )
  165. print(res)
  166. def test_get_packages():
  167. data = {
  168. 'deviceCode': 'test_sanjiang_4g'
  169. }
  170. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  171. endpoint = '/api/v1/device/packages', data = data
  172. )
  173. print(res)
  174. def test_get_device_list_post():
  175. data = {
  176. 'pageSize': 10,
  177. 'pageIndex': 3
  178. }
  179. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  180. endpoint = '/api/v1/device/list/by/dealer', data = data
  181. )
  182. print(res)
  183. def test_get_device_list_get():
  184. data = {
  185. 'pageSize': 10,
  186. 'pageIndex': 1
  187. }
  188. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').get(
  189. endpoint = '/api/v1/device/list/by/dealer?pageSize=10&pageIndex=2')
  190. print(res)
  191. def test_get_charger():
  192. data = {
  193. 'deviceCode': 'test_zhiyangji'
  194. }
  195. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  196. endpoint = '/api/v1/device/charger', data = data
  197. )
  198. print(res)
  199. def test_get_device_status():
  200. data = {
  201. 'deviceCode': 'test_sanjiang_4g'
  202. }
  203. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  204. endpoint = '/api/v1/device/status', data = data
  205. )
  206. print(res)
  207. def test_stop_device():
  208. data = {
  209. 'deviceCode': 'test_zhiyangji'
  210. }
  211. res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post(
  212. endpoint = '/api/v1/device/stop', data = data
  213. )
  214. print(res)
  215. def test_sijiang_start():
  216. data = {
  217. 'package': '2',
  218. 'extOrderNo': str(uuid.uuid4()),
  219. 'createTime': '2019-11-29 11:26:52',
  220. 'attachParas': {'chargeIndex': '8'},
  221. 'channel': 'charge',
  222. 'Sign': 'a3ohkE846AbtDMuBf7zeeo9VJAUO94It',
  223. 'deviceCode': 'test_sanjiang_4g'
  224. }
  225. res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
  226. endpoint = '/api/v1/device/start', data = data
  227. )
  228. print(res)
  229. # test_online()
  230. # test_start_device_1()
  231. test_start_device_new()
  232. # test_start_device_new2()
  233. # test_start_device_old()
  234. # test_start_device_old2()
  235. # test_get_package/s()
  236. # test_start_device_2()
  237. # test_get_device_list_get()
  238. # test_get_device_list_post()
  239. # test_get_charger()
  240. # test_get_device_status()
  241. # test_stop_device()
  242. # test_sijiang_start()