# -*- coding: utf-8 -*- """ unionpay.oauth ~~~~~~~~~~~~~~~ This module provides OAuth2 library for unionpay :copyright: (c) 2014 by messense. :license: MIT, see LICENSE for more details. """ from __future__ import absolute_import, unicode_literals import requests import six from apilib.systypes import StrEnum from apilib.utils_string import get_random_string import time from Crypto.Hash import SHA256 from library import to_binary, to_text try: import simplejson as json except ImportError: import json # NOQA class UnionPayOAuth(object): """ 云闪付平台 OAuth 网页授权 """ class Scope(StrEnum): SNSAPI_BASE = 'snsapi_base' SNSAPI_UNION_LOGIN = 'snsapi_union_login' API_BASE_URL = 'https://open.95516.com/' OAUTH_BASE_URL = 'https://open.95516.com/s/open/html/oauth.html' def __str__(self): _repr = '{kclass}(appid: {appid})'.format( kclass = self.__class__.__name__, appid = self.app_id) if six.PY2: return to_binary(_repr) else: return to_text(_repr) def __repr__(self): return str(self) def __init__(self, app_id, secret): self.app_id = app_id self.secret = secret def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(('http://', 'https://')): url = '{base}{endpoint}'.format( base = self.API_BASE_URL, endpoint = url_or_endpoint ) else: url = url_or_endpoint if isinstance(kwargs.get('data', ''), dict): body = json.dumps(kwargs['data'], ensure_ascii = False) body = body.encode('utf-8') kwargs['data'] = body kwargs['timeout'] = kwargs.get('timeout', 15) with requests.sessions.Session() as session: res = session.request( method = method, url = url, **kwargs ) try: res.raise_for_status() except requests.RequestException as reqe: return {'errmsg':u'网络异常'} result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False) if 'msg' in result and result['errcode'] != 0: return result def _get(self, url, **kwargs): return self._request( method = 'get', url_or_endpoint = url, **kwargs ) def get_oauth_token(self, auth_code): """获取 access_token :param auth_code: 授权完成跳转回来后 URL 中的 code 参数 :return: JSON 数据包 """ #先计算出随机字符串 nonceStr = get_random_string(16) timestamp = time.time() rawStr = 'appId=%s&secret=%s&nonceStr=%s×tamp=%s'(self.app_id,self.secret,nonceStr,timestamp) digest = SHA256.new() signature = digest.update(rawStr) res = self._get( 'open/access/1.0/backendToken', params = { 'appId': self.app_id, 'nonceStr': nonceStr, 'timestamp': timestamp, 'signature': signature } ) return res def get_user_info(self, token = None, code = None, lang = 'zh_CN'): res = self._get( 'open/access/1.0/backendToken', params = { 'appId': self.app_id, 'backendToken': token, 'code': code, 'grantType': 'authorization_code' } ) return res def authorize_url(self, redirect_uri, scope): pass