# -*- coding: utf-8 -*- # !/usr/bin/env python """ jd.oauth ~~~~~~~~~~~~~~~ This module provides OAuth2 library for jd pay :copyright: (c) 2019 by messense. :license: MIT, see LICENSE for more details. """ from __future__ import absolute_import, unicode_literals import hashlib import requests import six from six.moves.urllib.parse import quote import time from apilib.systypes import StrEnum from library import to_binary, to_text from library.jd.exceptions import JDAuthException try: import simplejson as json except ImportError: import json class JDOAuth(object): """ 京东支付 OAuth 网页授权 """ OAUTH_BASE_URL = 'https://jauth.jd.com' class Scope(StrEnum): """ 如果不获取电话号码,仅支持scope.userInfo """ USER_INFO = 'scope.userInfo' def __str__(self): _repr = '{kclass}(appid: {appid})'.format( kclass = self.__class__.__name__, appid = self.appid) if six.PY2: return to_binary(_repr) else: return to_text(_repr) def __repr__(self): return str(self) def __init__(self, appid, secret, state = ''): """ :param appid: 京东支付 app_id :param secret: 京东支付 secret :param state: 可选,京东支付 OAuth2 state """ self.appid = appid self.secret = secret self.state = state def _sign(self, data): _params = [to_binary('{0}={1}'.format(k, data[k])) for k in sorted(data) if data[k]] _params = ''.join(_params) _params = to_binary('{0}{1}'.format(_params, self.secret)) _params = quote(_params, safe = b'') return to_text(hashlib.md5(_params).hexdigest()) def _request(self, method, url_or_endpoint, **kwargs): if not url_or_endpoint.startswith(('http://', 'https://')): url = '{base}{endpoint}'.format( base = self.OAUTH_BASE_URL, endpoint = url_or_endpoint ) else: url = url_or_endpoint if isinstance(kwargs.get('data', ''), dict): body = json.dumps(kwargs['data'], ensure_ascii = False) body = body.encode('utf-8') kwargs['data'] = body kwargs['timeout'] = kwargs.get('timeout', 15) with requests.sessions.Session() as session: res = session.request( method = method, url = url, **kwargs ) try: res.raise_for_status() except requests.RequestException as reqe: raise JDAuthException( errCode = res.status_code, errMsg = reqe.message, client = self, request = reqe.request, response = reqe.response) result = json.loads(res.content.decode('utf-8', 'ignore'), strict = False) if 'errcode' in result and result['errcode'] != 0: errcode = result['errcode'] errmsg = result['errmsg'] raise JDAuthException( errCode = errcode, errMsg = errmsg, client = self, request = res.request, response = res ) return result def _get(self, url, **kwargs): return self._request( method = 'get', url_or_endpoint = url, **kwargs ) def get_oauth_token(self, auth_code): """获取 access_token :param auth_code: 授权完成跳转回来后 URL 中的 code 参数 :return: JSON 数据包 """ params = { 'grant_type': 'authorization_code', 'appid': self.appid, 'code': auth_code, 'ts': str(int(time.time())) } params.update({ 'sign': self._sign(params), 'secret': self.secret }) return self._get( '/access_token', params = params ) def get_user_info(self, access_token, openid): """ 获取用户信息 """ params = { 'access_token': access_token, 'openid': openid, 'ts': str(int(time.time())) } params.update({'sign': self._sign(params)}) res = self._get( '/userinfo', params = params ) return res def authorize_url(self, redirect_uri, scope): """获取授权跳转地址 :return: URL 地址 """ redirect_uri = quote(redirect_uri, safe = b'') url_list = [ self.OAUTH_BASE_URL, '/entrance?appid=', self.appid, '&redirect_uri=', redirect_uri, '&cancel_uri=', redirect_uri, '&response_type=code&scope=', scope, '&act_type=2', '&show_titile=0' ] if self.state: url_list.extend(['&state=', self.state]) else: url_list.extend(['&state=', '']) return ''.join(url_list)