123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- import os
- import time
- import json
- import base64
- import urllib
- import hashlib
- import logging
- logger = logging.getLogger(__name__)
- ACCESS_TOKEN = os.environ.get('FAKE_ACCESS_TOKEN', "N2L7KXa084WvelONYjkJ_traBMCCvy_UKmpUUzlrQ0EA2yNp3Iz6eSUrRG0bhaR_viswd50vDuPkY5nG43d"
- "1gbm-olT2KRMxOsVE08RfeD9lvK9lMguNG9kpIkKGZEjIf8Jv2m9fFhf8bnNa-yQH3g")
- OPEN_ID = os.environ.get('TEST_OPEN_ID', "test-ttttttttt_eeeee-sssssst")
- class FakeWechatApp(object):
- appid = os.environ.get('FAKE_WECHAT_APPID', 'fakewechatappid')
- secret = os.environ.get('FAKE_WECHAT_SECRET', 'fakewechatsecret')
- class FakeWechatAuthClient(object):
- def get_oauth_token(self, auth_code):
- return {
- 'access_token': ACCESS_TOKEN,
- 'openId': ''
- }
- def jsapi_sign(self, url):
- return {
- 'sign': '',
- 'noncestr': ''
- }
- @property
- def jsapi_ticket(self):
- return ''
- def get_user_info(self, **kwargs):
- return ''
- class FakeWechatAuthBridge(object):
- # 静默跳转,无需用户点击授权
- AUTH_SCOPE_BASE = 'snsapi_base'
- # 需要用户显式点击授权
- AUTH_SCOPE_USER = 'snsapi_userinfo'
- PROD_AUTH_CALLBACK_URL_TMPL = \
- '{redirect_uri}?scope={scope}&state={state}&connect_redirect=1#wechat_redirect'
- GET_USER_OPENID_LIST_URL = 'https://api.weixin.qq.com/cgi-bin/user/get?access_token=ACCESS_TOKEN&next_openid=NEXT_OPENID'
- TOKEN_CACHE_KEY = 'access_token_from_{appid}_{code}'
- GZH_TOKEN_CACHE_KEY = 'gzh_access_token_from_{appId}'
- def __init__(self, app, redirect_uri=None):
- self._appid = app['appid']
- self._secret = app['secret']
- self._redirect_uri = redirect_uri
- def __repr__(self):
- return '<FakeWechatAuthBridge(appid=%s, secret=%s, redirect_uri=%s)>' \
- % (self._appid, self._secret, self._redirect_uri)
- @property
- def appid(self):
- return self._appid
- @property
- def secret(self):
- return self._secret
- @property
- def client(self):
- return FakeWechatAuthClient()
- def authorize(self, auth_code):
- """微信oauth授权,经销商和用户都有涉及"""
- if auth_code is None:
- logger.debug('[wechat authorize]failure. code is null. bridge = %s' % repr(self))
- return None
- try:
- openId = self.client.get_oauth_token(auth_code).get('openid')
- logger.debug('[wechat authorize]success. code = %s. bridge = %s; openid = %s' % (auth_code, repr(self), openId))
- return openId
- except Exception, e:
- logger.exception('[wechat authorize]exception, bridge = %s; error = %s' % (repr(self), str(e),))
- return None
- def need_authorize(self, appId):
- if self.appid == appId:
- return False
- else:
- return True
- def get_user_info(self, auth_code):
- logger.debug('[wechat get_user_info]. code = %s' % auth_code)
- rv = self.client.get_oauth_token(auth_code)
- return self.client.get_user_info(openid=rv['openid'], access_token=rv['access_token'])
- def _sign(self, payload):
- raw = [(k, payload[k]) for k in sorted(payload.keys())]
- s = '&'.join('='.join(kv) for kv in raw if kv[1])
- return hashlib.sha1(s.encode("utf-8")).hexdigest().lower()
- def generate_js_auth_signature(self, url='/app/index.html'):
- """
- 生成签名信息,主要供经销商获取扫码权限
- :return:
- """
- signDict = self.client.jsapi_sign(url=url)
- return {
- 'signature': signDict['sign'],
- 'appId': self.appid,
- 'jsapi_ticket': self.client.jsapi_ticket,
- 'url': url,
- 'timestamp': str(int(time.time())),
- 'nonceStr': signDict['noncestr']
- }
- def generate_auth_url(self, redirect_uri, state='', scope=AUTH_SCOPE_BASE):
- """
- 生成授权url
- :param redirect_uri:
- :param state:
- :param scope:
- :return:
- """
- return self.PROD_AUTH_CALLBACK_URL_TMPL.format(
- redirect_uri=redirect_uri,
- state=state,
- scope=scope)
- def generate_auth_url_base_scope(self, redirect_uri, state=''):
- logger.debug('response redirect. bridge = %s, type = %s, redirect_uri = %s; state = %s' % (
- repr(self), self.AUTH_SCOPE_BASE, redirect_uri, state))
- return self.generate_auth_url(redirect_uri=redirect_uri, state=state, scope=self.AUTH_SCOPE_BASE)
- def generate_auth_url_user_scope(self, redirect_uri, state=''):
- logger.debug('response redirect. bridge = %s, type = %s, redirect_uri = %s; state = %s' % (
- repr(self), self.AUTH_SCOPE_USER, redirect_uri, state))
- return self.generate_auth_url(redirect_uri=redirect_uri, state=state, scope=self.AUTH_SCOPE_USER)
- @staticmethod
- def encode_state(state):
- return base64.b64encode(json.dumps(state))
- @staticmethod
- def parse_state(state_str):
- return json.loads(base64.b64decode(state_str))
- def is_subscribe_gzh(self, openId):
- return True
- __all__ = ['FakeWechatAuthBridge']
|