123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329 |
- # -*- coding: utf-8 -*-
- # !/usr/bin/env python
- import datetime
- import logging
- from django.contrib.auth.models import AnonymousUser
- import jwt
- from django.conf import settings as django_settings
- from django.core.cache import cache
- from django.dispatch.dispatcher import Signal
- from django.utils.module_loading import import_string
- from six import text_type
- import user_agents
- user_jwt_logged_in = Signal(['request', 'user'])
- logger = logging.getLogger(__name__)
- DEFAULT_JWT_AUTH_SETTING = {
- 'ALGORITHM': 'HS256',
- 'DECODER': 'django_jwt_session_auth.jwt_decoder',
- 'ENCODER': 'django_jwt_session_auth.jwt_encoder',
- 'PAYLOAD_TO_USER': None,
- 'USER_TO_PAYLOAD': None,
- 'USER_KEY': 'pk',
- 'USER_DOMAIN': None,
- 'TEST_USER_GETTER': None,
- 'SESSION': {
- 'EXPIRE': 3600 * 24 * 2,
- 'PREFIX': 'JWT_AUTH_CACHE:'
- }
- }
- def jwt_session_key(auth_domain): return '{auth_domain}_session_id'.format(auth_domain = auth_domain)
- # def jwt_session_key(auth_domain): return '{auth_domain}_SESSION_ID'.format(auth_domain = auth_domain.upper())
- class LazySettings(object):
- _settings = {}
- def __getitem__(self, item):
- if not self._settings:
- self._load_settings()
- if item in self._settings:
- return self._settings[item]
- else:
- return None
- def __contains__(self, key):
- if not self._settings:
- self._load_settings()
- if key in self._settings:
- return True
- else:
- return False
- def _load_settings(self):
- self._settings = {}
- # check settings
- user_defined_settings = getattr(django_settings, 'JWT_AUTH', {'default': DEFAULT_JWT_AUTH_SETTING})
- for auth_domain, user_defined_setting in user_defined_settings.iteritems():
- self._settings[auth_domain] = DEFAULT_JWT_AUTH_SETTING.copy()
- self._settings[auth_domain]['SESSION'].update(user_defined_setting.pop('SESSION', {}))
- self._settings[auth_domain].update(user_defined_setting)
- assert self._settings[auth_domain].get('PAYLOAD_TO_USER'), (
- u'JWT_AUTH (auth_domain=%s) settings\' `PAYLOAD_TO_USER` must be settled.\n'
- u'And it should return a user instance(whither it user-defined or not) '
- u'or None.' % (auth_domain,))
- assert self._settings[auth_domain].get('USER_TO_PAYLOAD'), (
- u'JWT_AUTH (auth_domain=%s) settings\' `USER_TO_PAYLOAD` must be settled.\n'
- u'And it should return a serializable dict payload' % (auth_domain,))
- jwt_settings = LazySettings()
- def get_authorization_header(request):
- """
- Return request's 'Authorization:' header, as a byte-string.
- Hide some test client ickyness where the header can be unicode.
- """
- # 先查看headers
- auth_domain = request.META.get(django_settings.JWT_AUTH_DOMAIN_DJANGO, None)
- if auth_domain:
- token = request.META.get(django_settings.JWT_TOKEN_DJANGO, '')
- return auth_domain, token
- if str(request.path) in django_settings.JWT_SERVICE_DOMAIN_MAP:
- auth_domain_dict = django_settings.JWT_SERVICE_DOMAIN_MAP[str(request.path)]
- auth_domain = auth_domain_dict['authDomain']
- if not auth_domain_dict['pre']:
- return auth_domain, None
- else:
- auth_domain = request.COOKIES.get(django_settings.JWT_AUTH_DOMAIN_COOKIE_NAME, None)
- if not auth_domain:
- return None, None
- token = request.COOKIES.get(jwt_session_key(auth_domain)) or b''
- if isinstance(token, text_type):
- token = token.encode('utf-8')
- return auth_domain, token
- def jwt_decoder(token, secret, algorithm):
- try:
- return jwt.decode(token, secret, algorithms = algorithm)
- except:
- return None
- def jwt_encoder(payload, secret, algorithm):
- return jwt.encode(payload, secret, algorithm = algorithm)
- class JwtSession(dict):
- """ jwt authenticated session
- """
- __slot__ = ('__storage__', '__key__', '__expire__')
- @classmethod
- def from_key(cls, key, expire):
- """ get jwt session corresponding to the key.
- :param key: session cache key
- :param expire: default expire time(Seconds)
- :return: JwtSession instance
- """
- data = cache.get(key, {})
- default_expire = expire
- return cls(key, default_expire, storage = data)
- def save(self):
- """ save session to cache
- """
- if self.__storage__:
- cache.set(self.__key__, self.__storage__, self.__expire__)
- def __init__(self, key, expire, storage = None):
- self.__dict__['__key__'] = key
- self.__dict__['__expire__'] = expire
- self.__dict__['__storage__'] = storage or {}
- def __getitem__(self, item):
- return self.__storage__[item]
- def __setitem__(self, key, value):
- self.__storage__[key] = value
- def __str__(self):
- return repr(self.__storage__)
- def __repr__(self):
- return '<JwtSession %s>' % self.__dict__['__key__']
- def get(self, k, d = None):
- return self.__storage__.get(k, d)
- class JwtAuthMiddleware(object):
- """ jwt auth middleware
- Check the `AUTHORIZATION` header in request, and add the correspond `user` and `jwt_session` to request.
- if the authentication failed, the `request.user` will be None, and `jwt_session` will be a empty JwtSession.
- """
- _user_key_prefix = ''
- _test_user = None
- def __init__(self, get_response = None):
- self.get_response = get_response
- def process_request(self, request):
- try:
- auth_domain, token = get_authorization_header(request)
- if django_settings.DEBUG_JWT:
- logger.debug('jwt session auth. url = %s; domain = %s; token = %s' % (request.path, auth_domain, token))
- if not auth_domain or auth_domain not in jwt_settings:
- return
- user = None
- if token:
- decoder = self._get_decoder(auth_domain)
- payload = decoder(token, jwt_settings[auth_domain]['SECRET'], (jwt_settings[auth_domain]['ALGORITHM'],))
- payload_to_user_func = self._get_payload_to_user_handler(auth_domain)
- user = payload_to_user_func(payload)
- if not user:
- user = AnonymousUser()
- if django_settings.DEBUG_JWT:
- logger.debug('login user = %s' % repr(user))
- jwt_session = self.get_jwt_session(auth_domain, user)
- setattr(request, 'user', user)
- setattr(request, 'jwt_session', jwt_session)
- request.jwt_session.save()
- except Exception as e:
- logger.exception(e)
- def process_response(self, request, response):
- #: 将授权域下达给访问了入口页的请求
- if str(request.path) in django_settings.JWT_SERVICE_DOMAIN_MAP:
- auth_domain_dict = django_settings.JWT_SERVICE_DOMAIN_MAP[str(request.path)]
- auth_domain = auth_domain_dict['authDomain']
- if auth_domain_dict['post']:
- cookie = {
- 'key': django_settings.JWT_AUTH_DOMAIN_COOKIE_NAME,
- 'value': auth_domain,
- 'max_age': 30 * 24 * 3600,
- 'secure': False,
- 'httponly': False
- }
- if 'localhost' not in django_settings.COOKIE_DOMAIN:
- cookie['domain'] = django_settings.COOKIE_DOMAIN
- response.set_cookie(**cookie)
- return response
- @classmethod
- def user_key_prefix(cls, auth_domain):
- if not cls._user_key_prefix:
- cls._user_key_prefix = jwt_settings[auth_domain]['SESSION']['PREFIX']
- return cls._user_key_prefix
- @classmethod
- def test_user(cls, auth_domain):
- """ load test user if TEST_USER_GETTER is exists
- """
- test_user_getter = jwt_settings[auth_domain]['TEST_USER_GETTER']
- if not cls._test_user and test_user_getter:
- test_user_getter = import_string(test_user_getter)
- cls._test_user = test_user_getter()
- return cls._test_user
- @classmethod
- def get_jwt_session(cls, auth_domain, user, expire = None):
- """ get specific user jwt_session
- :param auth_domain:
- :param user: user
- :param expire: expire time in seconds.
- :return: JwtSession instance
- """
- user_key = jwt_settings[auth_domain]['USER_KEY']
- if not user or isinstance(user, AnonymousUser):
- return cls._get_empty_jwt_session(auth_domain)
- session_key = cls.user_key_prefix(auth_domain) + str(getattr(user, user_key))
- session = JwtSession.from_key(session_key, expire = expire or jwt_settings[auth_domain]['SESSION']['EXPIRE'])
- return session
- @classmethod
- def jwt_login(cls, auth_domain, user, request, expire = None):
- """ jwt login user function
- :param auth_domain: auth domain
- :param expire: expire time
- :param user_object user: user instance
- :param request request: request instance
- :return str: token
- """
- user_to_payload_handler = import_string(jwt_settings[auth_domain]['USER_TO_PAYLOAD'])
- payload = user_to_payload_handler(user)
- encoder = cls._get_encoder(auth_domain)
- token = encoder(payload, jwt_settings[auth_domain]['SECRET'], jwt_settings[auth_domain]['ALGORITHM'])
- user_agent_string = request.META.get('HTTP_USER_AGENT', '')
- phoneOS = user_agents.parse(user_agent_string).os.family
- user.update(last_login=datetime.datetime.now(), phoneOS=phoneOS, lastLoginUserAgent=user_agent_string)
- request.user = user
- request.jwt_session = cls.get_jwt_session(auth_domain, user, expire = expire)
- user_jwt_logged_in.send(user.__class__, request = request, user = user)
- return token
- @classmethod
- def _get_empty_jwt_session(cls, auth_domain):
- """ get empty jwt session
- """
- return JwtSession(cls.user_key_prefix(auth_domain) + 'empty', None, {})
- @classmethod
- def _get_decoder(cls, auth_domain):
- """ get jwt token decoder
- """
- decoder_entry = jwt_settings[auth_domain]['DECODER']
- if decoder_entry == 'django_jwt_session_auth.jwt_decoder':
- return jwt_decoder
- elif isinstance(decoder_entry, basestring):
- return import_string(decoder_entry)
- else:
- return decoder_entry
- @classmethod
- def _get_encoder(cls, auth_domain):
- """ get jwt token decoder
- """
- encoder_entry = jwt_settings[auth_domain]['ENCODER']
- if encoder_entry == 'django_jwt_session_auth.jwt_encoder':
- return jwt_encoder
- elif isinstance(encoder_entry, basestring):
- return import_string(encoder_entry)
- else:
- return encoder_entry
- def _get_payload_to_user_handler(self, auth_domain):
- return import_string(jwt_settings[auth_domain]['PAYLOAD_TO_USER'])
- jwt_login = JwtAuthMiddleware.jwt_login
|