# -*- coding: utf-8 -*- # !/usr/bin/env python import base64 import os import time import uuid import requests from django.conf import settings os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.production") from base import init_env, md5 init_env(interactive = False) from apps.web.agent.models import Agent from apps.web.dealer.models import Dealer # TEST_DEALER_ID = '5b9ae99ad89a177846459999' TEST_DEALER_ID = '5d2c23090030483301d8914d' class ApiSender(object): # PAY_HOST = 'http://211.159.224.10' PAY_HOST = 'http://www.washpayer.com' def __init__(self, dealer_id, password): dealer = Dealer.objects(id = dealer_id).get() agent = Agent.objects(id = str(dealer.agentId)).get() self.test_case = { 'dealerId': dealer_id, 'username': str(dealer.username), 'password': password, 'agentId': dealer.agentId, 'domain': agent.domain, 'agentSign': agent.agentSign, 'mySign': agent.mySign } def get(self, endpoint, data = None, **kwargs): return self._request( method = 'get', endpoint = endpoint, data = data, **kwargs) def post(self, endpoint, data = None, **kwargs): return self._request( method = 'post', endpoint = endpoint, data = data, **kwargs ) def _decode_result(self, res): try: import simplejson as json result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False) except (TypeError, ValueError): return res return result def _request(self, method, endpoint, data = None, headers = None, **kwargs): url = '{base}{endpoint}'.format( base = self.PAY_HOST, endpoint = endpoint ) headers = { 'Content-Type': 'application/json', 'authorization': 'Basic {}'.format( base64.b64encode('{}:{}'.format(self.test_case['username'], md5(self.test_case['password'])))), 'sign': self.test_case['agentSign'], 'User-Agent':'MI 6X Build' } if data: data.update({'sign': self.test_case['agentSign']}) import simplejson as json body = json.dumps(data, ensure_ascii = False) body = body.encode('utf-8') kwargs['data'] = body kwargs['timeout'] = kwargs.get('timeout', 15) with requests.sessions.Session() as session: res = session.request( url = url, method = method, headers = headers, **kwargs) try: res.raise_for_status() except requests.RequestException as reqe: return { 'errcode': 'EXCEPTION', 'errmsg': str(reqe) } return self._decode_result(res) def test_online(): data = { 'deviceCode': 'test_zhiyangji_4g' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/status/online', data = data ) print(res) def test_start_device_old(): data = { 'package': '1', 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order', 'attachParas': { 'createTime': '2019-11-27 16:47:12', 'coins': '1', 'extOrderNo': str(uuid.uuid4()), 'devNo': '860344049185789', 'domain': '47.108.67.205:8080', 'channel': '123456', 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz', 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz', 'logicalCode': 'test_zhiyangji_4g' }, 'deviceCode': 'test_zhiyangji_4g'} res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/start', data = data ) print(res) def test_start_device_old2(): data = { 'package': { 'unit': u'分钟', 'time': 5 }, 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order', 'attachParas': { 'createTime': '2019-11-27 16:47:12', 'coins': '1', 'extOrderNo': str(uuid.uuid4()), 'devNo': '860344049185789', 'domain': '47.108.67.205:8080', 'channel': '123456', 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz', 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz', 'logicalCode': 'test_zhiyangji_4g' }, 'deviceCode': 'test_zhiyangji_4g'} res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/start', data = data ) print(res) def test_start_device_new(): data = { 'package': '1', # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order', 'createTime': '2019-11-27 16:47:12', 'extOrderNo': str(uuid.uuid4()), 'channel': '123456', 'attachParas': {}, 'deviceCode': 'test_zhiyangji'} res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post( endpoint = '/api/v1/device/start', data = data ) print(res) def test_start_device_new2(): data = { 'package': { 'coins': 3, 'time': 5 }, # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order', 'createTime': '2019-11-27 16:47:12', 'extOrderNo': str(uuid.uuid4()), 'channel': '123456', 'attachParas': {}, 'deviceCode': 'test_zhiyangji_4g'} res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/start', data = data ) print(res) def test_get_packages(): data = { 'deviceCode': 'test_sanjiang_4g' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/packages', data = data ) print(res) def test_get_device_list_post(): data = { 'pageSize': 10, 'pageIndex': 3 } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/list/by/dealer', data = data ) print(res) def test_get_device_list_get(): data = { 'pageSize': 10, 'pageIndex': 1 } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').get( endpoint = '/api/v1/device/list/by/dealer?pageSize=10&pageIndex=2') print(res) def test_get_charger(): data = { 'deviceCode': 'test_zhiyangji' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/charger', data = data ) print(res) def test_get_device_status(): data = { 'deviceCode': 'test_sanjiang_4g' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/status', data = data ) print(res) def test_stop_device(): data = { 'deviceCode': 'test_zhiyangji' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post( endpoint = '/api/v1/device/stop', data = data ) print(res) def test_sijiang_start(): data = { 'package': '2', 'extOrderNo': str(uuid.uuid4()), 'createTime': '2019-11-29 11:26:52', 'attachParas': {'chargeIndex': '8'}, 'channel': 'charge', 'Sign': 'a3ohkE846AbtDMuBf7zeeo9VJAUO94It', 'deviceCode': 'test_sanjiang_4g' } res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post( endpoint = '/api/v1/device/start', data = data ) print(res) # test_online() # test_start_device_1() test_start_device_new() # test_start_device_new2() # test_start_device_old() # test_start_device_old2() # test_get_package/s() # test_start_device_2() # test_get_device_list_get() # test_get_device_list_post() # test_get_charger() # test_get_device_status() # test_stop_device() # test_sijiang_start()