# -*- coding: utf-8 -*- #!/usr/bin/env python import os import time import json import base64 import urllib import hashlib import logging logger = logging.getLogger(__name__) ACCESS_TOKEN = os.environ.get('FAKE_ACCESS_TOKEN', "N2L7KXa084WvelONYjkJ_traBMCCvy_UKmpUUzlrQ0EA2yNp3Iz6eSUrRG0bhaR_viswd50vDuPkY5nG43d" "1gbm-olT2KRMxOsVE08RfeD9lvK9lMguNG9kpIkKGZEjIf8Jv2m9fFhf8bnNa-yQH3g") OPEN_ID = os.environ.get('TEST_OPEN_ID', "test-ttttttttt_eeeee-sssssst") class FakeWechatApp(object): appid = os.environ.get('FAKE_WECHAT_APPID', 'fakewechatappid') secret = os.environ.get('FAKE_WECHAT_SECRET', 'fakewechatsecret') class FakeWechatAuthClient(object): def get_oauth_token(self, auth_code): return { 'access_token': ACCESS_TOKEN, 'openId': '' } def jsapi_sign(self, url): return { 'sign': '', 'noncestr': '' } @property def jsapi_ticket(self): return '' def get_user_info(self, **kwargs): return '' class FakeWechatAuthBridge(object): # 静默跳转,无需用户点击授权 AUTH_SCOPE_BASE = 'snsapi_base' # 需要用户显式点击授权 AUTH_SCOPE_USER = 'snsapi_userinfo' PROD_AUTH_CALLBACK_URL_TMPL = \ '{redirect_uri}?scope={scope}&state={state}&connect_redirect=1#wechat_redirect' GET_USER_OPENID_LIST_URL = 'https://api.weixin.qq.com/cgi-bin/user/get?access_token=ACCESS_TOKEN&next_openid=NEXT_OPENID' TOKEN_CACHE_KEY = 'access_token_from_{appid}_{code}' GZH_TOKEN_CACHE_KEY = 'gzh_access_token_from_{appId}' def __init__(self, app, redirect_uri=None): self._appid = app['appid'] self._secret = app['secret'] self._redirect_uri = redirect_uri def __repr__(self): return '' \ % (self._appid, self._secret, self._redirect_uri) @property def appid(self): return self._appid @property def secret(self): return self._secret @property def client(self): return FakeWechatAuthClient() def authorize(self, auth_code): """微信oauth授权,经销商和用户都有涉及""" if auth_code is None: logger.debug('[wechat authorize]failure. code is null. bridge = %s' % repr(self)) return None try: openId = self.client.get_oauth_token(auth_code).get('openid') logger.debug('[wechat authorize]success. code = %s. bridge = %s; openid = %s' % (auth_code, repr(self), openId)) return openId except Exception, e: logger.exception('[wechat authorize]exception, bridge = %s; error = %s' % (repr(self), str(e),)) return None def need_authorize(self, appId): if self.appid == appId: return False else: return True def get_user_info(self, auth_code): logger.debug('[wechat get_user_info]. code = %s' % auth_code) rv = self.client.get_oauth_token(auth_code) return self.client.get_user_info(openid=rv['openid'], access_token=rv['access_token']) def _sign(self, payload): raw = [(k, payload[k]) for k in sorted(payload.keys())] s = '&'.join('='.join(kv) for kv in raw if kv[1]) return hashlib.sha1(s.encode("utf-8")).hexdigest().lower() def generate_js_auth_signature(self, url='/app/index.html'): """ 生成签名信息,主要供经销商获取扫码权限 :return: """ signDict = self.client.jsapi_sign(url=url) return { 'signature': signDict['sign'], 'appId': self.appid, 'jsapi_ticket': self.client.jsapi_ticket, 'url': url, 'timestamp': str(int(time.time())), 'nonceStr': signDict['noncestr'] } def generate_auth_url(self, redirect_uri, state='', scope=AUTH_SCOPE_BASE): """ 生成授权url :param redirect_uri: :param state: :param scope: :return: """ return self.PROD_AUTH_CALLBACK_URL_TMPL.format( redirect_uri=redirect_uri, state=state, scope=scope) def generate_auth_url_base_scope(self, redirect_uri, state=''): logger.debug('response redirect. bridge = %s, type = %s, redirect_uri = %s; state = %s' % ( repr(self), self.AUTH_SCOPE_BASE, redirect_uri, state)) return self.generate_auth_url(redirect_uri=redirect_uri, state=state, scope=self.AUTH_SCOPE_BASE) def generate_auth_url_user_scope(self, redirect_uri, state=''): logger.debug('response redirect. bridge = %s, type = %s, redirect_uri = %s; state = %s' % ( repr(self), self.AUTH_SCOPE_USER, redirect_uri, state)) return self.generate_auth_url(redirect_uri=redirect_uri, state=state, scope=self.AUTH_SCOPE_USER) @staticmethod def encode_state(state): return base64.b64encode(json.dumps(state)) @staticmethod def parse_state(state_str): return json.loads(base64.b64decode(state_str)) def is_subscribe_gzh(self, openId): return True __all__ = ['FakeWechatAuthBridge']