123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import requests
- import simplejson as json
- import datetime
- from hashlib import md5
- from library.qiben.exceptions import QibenException
- API_URL = 'http://www.tibiot.cn/api/'
- class SimManager(object):
- def __init__(self, user = 'weifutong', password = 'yapaRxO3j0oz!evQ'):
- self.user = user
- self.password = password
- @property
- def time_key(self):
- return datetime.datetime.now().strftime("%Y%m%d%H%M%S")
- def encrypt_password(self, time_key):
- return md5('{}{}'.format(md5(self.password).hexdigest(), time_key)).hexdigest()
- def _get(self, endpoint, **kwargs):
- return self._request(
- method = 'get',
- endpoint = endpoint,
- **kwargs)
- def _post(self, endpoint, **kwargs):
- return self._request(
- method = 'post',
- endpoint = endpoint,
- **kwargs
- )
- def _decode_result(self, res):
- try:
- result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False)
- except (TypeError, ValueError):
- return res
- return result
- def _handle_result(self, res, method = None, url = None,
- result_processor = None, **kwargs):
- if not isinstance(res, dict):
- result = self._decode_result(res)
- else:
- result = res
- return result if not result_processor else result_processor(result)
- def _request(self, method, endpoint, **kwargs):
- url = '{base}{endpoint}'.format(
- base = API_URL,
- endpoint = endpoint
- )
- headers = {'Content-Type': 'application/json;charset=utf-8'}
- if isinstance(kwargs.get('data', ''), dict):
- body = json.dumps(kwargs['data'], ensure_ascii = False)
- body = body.encode('utf-8')
- kwargs['data'] = body
- kwargs['timeout'] = kwargs.get('timeout', 5)
- result_processor = kwargs.pop('result_processor', None)
- with requests.sessions.Session() as session:
- res = session.request(
- url = url,
- method = method,
- headers = headers,
- **kwargs
- )
- res.raise_for_status()
- return self._handle_result(
- res, method, url, result_processor, **kwargs
- )
- def get_card_info(self, iccid):
- tkey = self.time_key
- end_point = 'v1/card/getCardInfo?username={}&password={}&tkey={}&iccid={}'.format(
- self.user, self.encrypt_password(tkey), tkey, iccid)
- result = self._get(endpoint = end_point)
- if result['status'] != 0:
- raise QibenException(result['status'], result['message'])
- return result['data']
- def get_card_list(self, begin_iccid, end_iccid, card_type = 1):
- tkey = self.time_key
- data = {
- 'userName': self.user,
- 'passWord': self.encrypt_password(tkey),
- 'tKey': tkey,
- 'cardType': card_type,
- 'pageNum': 1,
- 'pageSize': 100,
- 'iccid': '{}-{}'.format(begin_iccid, end_iccid)
- }
- result = self._post(endpoint = 'v1/card/batchQueryCardInfo', data = data)
- if 'status' not in result or result['status'] != 0:
- raise QibenException(result['status'], result['message'])
- return result['data']
- def get_pool_data(self, pool_id):
- tkey = self.time_key
- end_point = 'v1/getFlowPoolData?username={}&password={}&tkey={}&pool_id={}'.format(
- self.user, self.encrypt_password(tkey), tkey, pool_id)
- result = self._get(endpoint = end_point)
- if result['status'] != 0:
- raise QibenException(result['status'], result['message'])
- return result['data']
- def send_sms(self, iccid, content):
- tkey = self.time_key
- end_point = 'v1/card/sendSms?username={}&password={}&tkey={}&iccid={}&content={}'.format(
- self.user, self.encrypt_password(tkey), tkey, iccid, content)
- result = self._get(endpoint = end_point)
- if result['status'] != 0:
- raise QibenException(result['status'], result['message'])
- return result['data']
|