# -*- coding: utf-8 -*- # !/usr/bin/env python import datetime import logging from django.contrib.auth.models import AnonymousUser import jwt from django.conf import settings as django_settings from django.core.cache import cache from django.dispatch.dispatcher import Signal from django.utils.module_loading import import_string from six import text_type import user_agents user_jwt_logged_in = Signal(['request', 'user']) logger = logging.getLogger(__name__) DEFAULT_JWT_AUTH_SETTING = { 'ALGORITHM': 'HS256', 'DECODER': 'django_jwt_session_auth.jwt_decoder', 'ENCODER': 'django_jwt_session_auth.jwt_encoder', 'PAYLOAD_TO_USER': None, 'USER_TO_PAYLOAD': None, 'USER_KEY': 'pk', 'USER_DOMAIN': None, 'TEST_USER_GETTER': None, 'SESSION': { 'EXPIRE': 3600 * 24 * 2, 'PREFIX': 'JWT_AUTH_CACHE:' } } def jwt_session_key(auth_domain): return '{auth_domain}_session_id'.format(auth_domain = auth_domain) # def jwt_session_key(auth_domain): return '{auth_domain}_SESSION_ID'.format(auth_domain = auth_domain.upper()) class LazySettings(object): _settings = {} def __getitem__(self, item): if not self._settings: self._load_settings() if item in self._settings: return self._settings[item] else: return None def __contains__(self, key): if not self._settings: self._load_settings() if key in self._settings: return True else: return False def _load_settings(self): self._settings = {} # check settings user_defined_settings = getattr(django_settings, 'JWT_AUTH', {'default': DEFAULT_JWT_AUTH_SETTING}) for auth_domain, user_defined_setting in user_defined_settings.iteritems(): self._settings[auth_domain] = DEFAULT_JWT_AUTH_SETTING.copy() self._settings[auth_domain]['SESSION'].update(user_defined_setting.pop('SESSION', {})) self._settings[auth_domain].update(user_defined_setting) assert self._settings[auth_domain].get('PAYLOAD_TO_USER'), ( u'JWT_AUTH (auth_domain=%s) settings\' `PAYLOAD_TO_USER` must be settled.\n' u'And it should return a user instance(whither it user-defined or not) ' u'or None.' % (auth_domain,)) assert self._settings[auth_domain].get('USER_TO_PAYLOAD'), ( u'JWT_AUTH (auth_domain=%s) settings\' `USER_TO_PAYLOAD` must be settled.\n' u'And it should return a serializable dict payload' % (auth_domain,)) jwt_settings = LazySettings() def get_authorization_header(request): """ Return request's 'Authorization:' header, as a byte-string. Hide some test client ickyness where the header can be unicode. """ # 先查看headers auth_domain = request.META.get(django_settings.JWT_AUTH_DOMAIN_DJANGO, None) if auth_domain: token = request.META.get(django_settings.JWT_TOKEN_DJANGO, '') return auth_domain, token if str(request.path) in django_settings.JWT_SERVICE_DOMAIN_MAP: auth_domain_dict = django_settings.JWT_SERVICE_DOMAIN_MAP[str(request.path)] auth_domain = auth_domain_dict['authDomain'] if not auth_domain_dict['pre']: return auth_domain, None else: auth_domain = request.COOKIES.get(django_settings.JWT_AUTH_DOMAIN_COOKIE_NAME, None) if not auth_domain: return None, None token = request.COOKIES.get(jwt_session_key(auth_domain)) or b'' if isinstance(token, text_type): token = token.encode('utf-8') return auth_domain, token def jwt_decoder(token, secret, algorithm): try: return jwt.decode(token, secret, algorithms = algorithm) except: return None def jwt_encoder(payload, secret, algorithm): return jwt.encode(payload, secret, algorithm = algorithm) class JwtSession(dict): """ jwt authenticated session """ __slot__ = ('__storage__', '__key__', '__expire__') @classmethod def from_key(cls, key, expire): """ get jwt session corresponding to the key. :param key: session cache key :param expire: default expire time(Seconds) :return: JwtSession instance """ data = cache.get(key, {}) default_expire = expire return cls(key, default_expire, storage = data) def save(self): """ save session to cache """ if self.__storage__: cache.set(self.__key__, self.__storage__, self.__expire__) def __init__(self, key, expire, storage = None): self.__dict__['__key__'] = key self.__dict__['__expire__'] = expire self.__dict__['__storage__'] = storage or {} def __getitem__(self, item): return self.__storage__[item] def __setitem__(self, key, value): self.__storage__[key] = value def __str__(self): return repr(self.__storage__) def __repr__(self): return '' % self.__dict__['__key__'] def get(self, k, d = None): return self.__storage__.get(k, d) class JwtAuthMiddleware(object): """ jwt auth middleware Check the `AUTHORIZATION` header in request, and add the correspond `user` and `jwt_session` to request. if the authentication failed, the `request.user` will be None, and `jwt_session` will be a empty JwtSession. """ _user_key_prefix = '' _test_user = None def __init__(self, get_response = None): self.get_response = get_response def process_request(self, request): try: auth_domain, token = get_authorization_header(request) if django_settings.DEBUG_JWT: logger.debug('jwt session auth. url = %s; domain = %s; token = %s' % (request.path, auth_domain, token)) if not auth_domain or auth_domain not in jwt_settings: return user = None if token: decoder = self._get_decoder(auth_domain) payload = decoder(token, jwt_settings[auth_domain]['SECRET'], (jwt_settings[auth_domain]['ALGORITHM'],)) payload_to_user_func = self._get_payload_to_user_handler(auth_domain) user = payload_to_user_func(payload) if not user: user = AnonymousUser() if django_settings.DEBUG_JWT: logger.debug('login user = %s' % repr(user)) jwt_session = self.get_jwt_session(auth_domain, user) setattr(request, 'user', user) setattr(request, 'jwt_session', jwt_session) request.jwt_session.save() except Exception as e: logger.exception(e) def process_response(self, request, response): #: 将授权域下达给访问了入口页的请求 if str(request.path) in django_settings.JWT_SERVICE_DOMAIN_MAP: auth_domain_dict = django_settings.JWT_SERVICE_DOMAIN_MAP[str(request.path)] auth_domain = auth_domain_dict['authDomain'] if auth_domain_dict['post']: cookie = { 'key': django_settings.JWT_AUTH_DOMAIN_COOKIE_NAME, 'value': auth_domain, 'max_age': 30 * 24 * 3600, 'secure': False, 'httponly': False } if 'localhost' not in django_settings.COOKIE_DOMAIN: cookie['domain'] = django_settings.COOKIE_DOMAIN response.set_cookie(**cookie) return response @classmethod def user_key_prefix(cls, auth_domain): if not cls._user_key_prefix: cls._user_key_prefix = jwt_settings[auth_domain]['SESSION']['PREFIX'] return cls._user_key_prefix @classmethod def test_user(cls, auth_domain): """ load test user if TEST_USER_GETTER is exists """ test_user_getter = jwt_settings[auth_domain]['TEST_USER_GETTER'] if not cls._test_user and test_user_getter: test_user_getter = import_string(test_user_getter) cls._test_user = test_user_getter() return cls._test_user @classmethod def get_jwt_session(cls, auth_domain, user, expire = None): """ get specific user jwt_session :param auth_domain: :param user: user :param expire: expire time in seconds. :return: JwtSession instance """ user_key = jwt_settings[auth_domain]['USER_KEY'] if not user or isinstance(user, AnonymousUser): return cls._get_empty_jwt_session(auth_domain) session_key = cls.user_key_prefix(auth_domain) + str(getattr(user, user_key)) session = JwtSession.from_key(session_key, expire = expire or jwt_settings[auth_domain]['SESSION']['EXPIRE']) return session @classmethod def jwt_login(cls, auth_domain, user, request, expire = None): """ jwt login user function :param auth_domain: auth domain :param expire: expire time :param user_object user: user instance :param request request: request instance :return str: token """ user_to_payload_handler = import_string(jwt_settings[auth_domain]['USER_TO_PAYLOAD']) payload = user_to_payload_handler(user) encoder = cls._get_encoder(auth_domain) token = encoder(payload, jwt_settings[auth_domain]['SECRET'], jwt_settings[auth_domain]['ALGORITHM']) user_agent_string = request.META.get('HTTP_USER_AGENT', '') phoneOS = user_agents.parse(user_agent_string).os.family user.update(last_login=datetime.datetime.now(), phoneOS=phoneOS, lastLoginUserAgent=user_agent_string) request.user = user request.jwt_session = cls.get_jwt_session(auth_domain, user, expire = expire) user_jwt_logged_in.send(user.__class__, request = request, user = user) return token @classmethod def _get_empty_jwt_session(cls, auth_domain): """ get empty jwt session """ return JwtSession(cls.user_key_prefix(auth_domain) + 'empty', None, {}) @classmethod def _get_decoder(cls, auth_domain): """ get jwt token decoder """ decoder_entry = jwt_settings[auth_domain]['DECODER'] if decoder_entry == 'django_jwt_session_auth.jwt_decoder': return jwt_decoder elif isinstance(decoder_entry, basestring): return import_string(decoder_entry) else: return decoder_entry @classmethod def _get_encoder(cls, auth_domain): """ get jwt token decoder """ encoder_entry = jwt_settings[auth_domain]['ENCODER'] if encoder_entry == 'django_jwt_session_auth.jwt_encoder': return jwt_encoder elif isinstance(encoder_entry, basestring): return import_string(encoder_entry) else: return encoder_entry def _get_payload_to_user_handler(self, auth_domain): return import_string(jwt_settings[auth_domain]['PAYLOAD_TO_USER']) jwt_login = JwtAuthMiddleware.jwt_login