|
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import base64
- import os
- import time
- import uuid
- import requests
- from django.conf import settings
- os.environ.setdefault("DJANGO_SETTINGS_MODULE", "configs.production")
- from base import init_env, md5
- init_env(interactive = False)
- from apps.web.agent.models import Agent
- from apps.web.dealer.models import Dealer
- # TEST_DEALER_ID = '5b9ae99ad89a177846459999'
- TEST_DEALER_ID = '5d2c23090030483301d8914d'
- class ApiSender(object):
- # PAY_HOST = 'http://211.159.224.10'
- PAY_HOST = 'http://www.washpayer.com'
- def __init__(self, dealer_id, password):
- dealer = Dealer.objects(id = dealer_id).get()
- agent = Agent.objects(id = str(dealer.agentId)).get()
- self.test_case = {
- 'dealerId': dealer_id,
- 'username': str(dealer.username),
- 'password': password,
- 'agentId': dealer.agentId,
- 'domain': agent.domain,
- 'agentSign': agent.agentSign,
- 'mySign': agent.mySign
- }
- def get(self, endpoint, data = None, **kwargs):
- return self._request(
- method = 'get',
- endpoint = endpoint,
- data = data,
- **kwargs)
- def post(self, endpoint, data = None, **kwargs):
- return self._request(
- method = 'post',
- endpoint = endpoint,
- data = data,
- **kwargs
- )
- def _decode_result(self, res):
- try:
- import simplejson as json
- result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False)
- except (TypeError, ValueError):
- return res
- return result
- def _request(self, method, endpoint, data = None, headers = None, **kwargs):
- url = '{base}{endpoint}'.format(
- base = self.PAY_HOST,
- endpoint = endpoint
- )
- headers = {
- 'Content-Type': 'application/json',
- 'authorization': 'Basic {}'.format(
- base64.b64encode('{}:{}'.format(self.test_case['username'], md5(self.test_case['password'])))),
- 'sign': self.test_case['agentSign'],
- 'User-Agent':'MI 6X Build'
- }
- if data:
- data.update({'sign': self.test_case['agentSign']})
- import simplejson as json
- body = json.dumps(data, ensure_ascii = False)
- body = body.encode('utf-8')
- kwargs['data'] = body
- kwargs['timeout'] = kwargs.get('timeout', 15)
- with requests.sessions.Session() as session:
- res = session.request(
- url = url,
- method = method,
- headers = headers,
- **kwargs)
- try:
- res.raise_for_status()
- except requests.RequestException as reqe:
- return {
- 'errcode': 'EXCEPTION',
- 'errmsg': str(reqe)
- }
- return self._decode_result(res)
- def test_online():
- data = {
- 'deviceCode': 'test_zhiyangji_4g'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/status/online', data = data
- )
- print(res)
- def test_start_device_old():
- data = {
- 'package': '1',
- 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
- 'attachParas':
- {
- 'createTime': '2019-11-27 16:47:12',
- 'coins': '1',
- 'extOrderNo': str(uuid.uuid4()),
- 'devNo': '860344049185789',
- 'domain': '47.108.67.205:8080',
- 'channel': '123456',
- 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
- 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
- 'logicalCode': 'test_zhiyangji_4g'
- },
- 'deviceCode': 'test_zhiyangji_4g'}
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/start', data = data
- )
- print(res)
- def test_start_device_old2():
- data = {
- 'package': {
- 'unit': u'分钟',
- 'time': 5
- },
- 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
- 'attachParas':
- {
- 'createTime': '2019-11-27 16:47:12',
- 'coins': '1',
- 'extOrderNo': str(uuid.uuid4()),
- 'devNo': '860344049185789',
- 'domain': '47.108.67.205:8080',
- 'channel': '123456',
- 'agentSign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
- 'mySign': 'nUiUwUGj8IoiDopmMIQxcMdiUj3wI6Nz',
- 'logicalCode': 'test_zhiyangji_4g'
- },
- 'deviceCode': 'test_zhiyangji_4g'}
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/start', data = data
- )
- print(res)
- def test_start_device_new():
- data = {
- 'package': '1',
- # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
- 'createTime': '2019-11-27 16:47:12',
- 'extOrderNo': str(uuid.uuid4()),
- 'channel': '123456',
- 'attachParas': {},
- 'deviceCode': 'test_zhiyangji'}
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post(
- endpoint = '/api/v1/device/start', data = data
- )
- print(res)
- def test_start_device_new2():
- data = {
- 'package': {
- 'coins': 3,
- 'time': 5
- },
- # 'notify_url': 'http://develop.5tao5ai.com/api/test/notify/order',
- 'createTime': '2019-11-27 16:47:12',
- 'extOrderNo': str(uuid.uuid4()),
- 'channel': '123456',
- 'attachParas': {},
- 'deviceCode': 'test_zhiyangji_4g'}
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/start', data = data
- )
- print(res)
- def test_get_packages():
- data = {
- 'deviceCode': 'test_sanjiang_4g'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/packages', data = data
- )
- print(res)
- def test_get_device_list_post():
- data = {
- 'pageSize': 10,
- 'pageIndex': 3
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/list/by/dealer', data = data
- )
- print(res)
- def test_get_device_list_get():
- data = {
- 'pageSize': 10,
- 'pageIndex': 1
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').get(
- endpoint = '/api/v1/device/list/by/dealer?pageSize=10&pageIndex=2')
- print(res)
- def test_get_charger():
- data = {
- 'deviceCode': 'test_zhiyangji'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/charger', data = data
- )
- print(res)
- def test_get_device_status():
- data = {
- 'deviceCode': 'test_sanjiang_4g'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/status', data = data
- )
- print(res)
- def test_stop_device():
- data = {
- 'deviceCode': 'test_zhiyangji'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = 'lcds2017').post(
- endpoint = '/api/v1/device/stop', data = data
- )
- print(res)
- def test_sijiang_start():
- data = {
- 'package': '2',
- 'extOrderNo': str(uuid.uuid4()),
- 'createTime': '2019-11-29 11:26:52',
- 'attachParas': {'chargeIndex': '8'},
- 'channel': 'charge',
- 'Sign': 'a3ohkE846AbtDMuBf7zeeo9VJAUO94It',
- 'deviceCode': 'test_sanjiang_4g'
- }
- res = ApiSender(dealer_id = TEST_DEALER_ID, password = '123456').post(
- endpoint = '/api/v1/device/start', data = data
- )
- print(res)
- # test_online()
- # test_start_device_1()
- test_start_device_new()
- # test_start_device_new2()
- # test_start_device_old()
- # test_start_device_old2()
- # test_get_package/s()
- # test_start_device_2()
- # test_get_device_list_get()
- # test_get_device_list_post()
- # test_get_charger()
- # test_get_device_status()
- # test_stop_device()
- # test_sijiang_start()
|