# -*- coding: utf-8 -*- """ AipBase """ import hmac import json import hashlib import datetime import base64 import time import sys import requests requests.packages.urllib3.disable_warnings() if sys.version_info.major == 2: from urllib import urlencode from urllib import quote from urlparse import urlparse else: from urllib.parse import urlencode from urllib.parse import quote from urllib.parse import urlparse class AipBase(object): """ AipBase """ __accessTokenUrl = 'https://aip.baidubce.com/oauth/2.0/token' __reportUrl = 'https://aip.baidubce.com/rpc/2.0/feedback/v1/report' __scope = 'brain_all_scope' def __init__(self, appId, apiKey, secretKey): """ AipBase(appId, apiKey, secretKey) """ self._appId = appId.strip() self._apiKey = apiKey.strip() self._secretKey = secretKey.strip() self._authObj = {} self._isCloudUser = None self.__client = requests self.__connectTimeout = 60.0 self.__socketTimeout = 60.0 self._proxies = {} self.__version = '2_2_18' def getVersion(self): """ version """ return self.__version def setConnectionTimeoutInMillis(self, ms): """ setConnectionTimeoutInMillis """ self.__connectTimeout = ms / 1000.0 def setSocketTimeoutInMillis(self, ms): """ setSocketTimeoutInMillis """ self.__socketTimeout = ms / 1000.0 def setProxies(self, proxies): """ proxies """ self._proxies = proxies def _request(self, url, data, headers=None): """ self._request('', {}) """ try: result = self._validate(url, data) if result != True: return result authObj = self._auth() params = self._getParams(authObj) data = self._proccessRequest(url, params, data, headers) headers = self._getAuthHeaders('POST', url, params, headers) response = self.__client.post(url, data=data, params=params, headers=headers, verify=False, timeout=( self.__connectTimeout, self.__socketTimeout, ), proxies=self._proxies ) obj = self._proccessResult(response.content) if not self._isCloudUser and obj.get('error_code', '') == 110: authObj = self._auth(True) params = self._getParams(authObj) response = self.__client.post(url, data=data, params=params, headers=headers, verify=False, timeout=( self.__connectTimeout, self.__socketTimeout, ), proxies=self._proxies ) obj = self._proccessResult(response.content) except (requests.exceptions.ReadTimeout, requests.exceptions.ConnectTimeout) as e: return { 'error_code': 'SDK108', 'error_msg': 'connection or read data timeout', } return obj def _validate(self, url, data): """ validate """ return True def _proccessRequest(self, url, params, data, headers): """ 参数处理 """ params['aipSdk'] = 'python' params['aipVersion'] = self.__version return data def _proccessResult(self, content): """ formate result """ if sys.version_info.major == 2: return json.loads(content) or {} else: return json.loads(content.decode()) or {} def _auth(self, refresh=False): """ api access auth """ if self._isCloudUser: return self._authObj #未过期 if not refresh: tm = self._authObj.get('time', 0) + int(self._authObj.get('expires_in', 0)) - 30 if tm > int(time.time()): return self._authObj obj = self.__client.get(self.__accessTokenUrl, verify=False, params={ 'grant_type': 'client_credentials', 'client_id': self._apiKey, 'client_secret': self._secretKey, }, timeout=( self.__connectTimeout, self.__socketTimeout, ), proxies=self._proxies).json() self._isCloudUser = not self._isPermission(obj) obj['time'] = int(time.time()) self._authObj = obj return obj def _isPermission(self, authObj): """ check whether permission """ scopes = authObj.get('scope', '') return self.__scope in scopes.split(' ') def _getParams(self, authObj): """ api request http url params """ params = {} if self._isCloudUser == False: params['access_token'] = authObj['access_token'] return params def _getAuthHeaders(self, method, url, params=None, headers=None): """ api request http headers """ headers = headers or {} params = params or {} if self._isCloudUser == False: return headers urlResult = urlparse(url) for kv in urlResult.query.strip().split('&'): if kv: k, v = kv.split('=') params[k] = v # UTC timestamp timestamp = datetime.datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') headers['Host'] = urlResult.hostname headers['x-bce-date'] = timestamp version, expire = '1', '1800' # 1 Generate SigningKey val = "bce-auth-v%s/%s/%s/%s" % (version, self._apiKey, timestamp, expire) signingKey = hmac.new(self._secretKey.encode('utf-8'), val.encode('utf-8'), hashlib.sha256 ).hexdigest() # 2 Generate CanonicalRequest # 2.1 Genrate CanonicalURI canonicalUri = quote(urlResult.path) # 2.2 Generate CanonicalURI: not used here # 2.3 Generate CanonicalHeaders: only include host here canonicalHeaders = [] for header, val in headers.items(): canonicalHeaders.append( '%s:%s' % ( quote(header.strip(), '').lower(), quote(val.strip(), '') ) ) canonicalHeaders = '\n'.join(sorted(canonicalHeaders)) # 2.4 Generate CanonicalRequest canonicalRequest = '%s\n%s\n%s\n%s' % ( method.upper(), canonicalUri, '&'.join(sorted(urlencode(params).split('&'))), canonicalHeaders ) # 3 Generate Final Signature signature = hmac.new(signingKey.encode('utf-8'), canonicalRequest.encode('utf-8'), hashlib.sha256 ).hexdigest() headers['authorization'] = 'bce-auth-v%s/%s/%s/%s/%s/%s' % ( version, self._apiKey, timestamp, expire, ';'.join(headers.keys()).lower(), signature ) return headers def report(self, feedback): """ 数据反馈 """ data = {} data['feedback'] = feedback return self._request(self.__reportUrl, data) def post(self, url, data, headers=None): """ self.post('', {}) """ return self._request(url, data, headers)