123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272 |
- # -*- coding: utf-8 -*-
- #!/usr/bin/env python
- """Core Unit Test Module"""
- import os
- os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.systest')
- import sys
- import pytest
- import inspect
- import simplejson as json
- from Cookie import SimpleCookie
- from importlib import import_module
- import requests
- from mongoengine import connect
- from django.conf import settings
- from django.contrib.auth import get_user, logout, login
- from django.http import HttpRequest
- from django.test.runner import DiscoverRunner
- from django.test import TestCase as _TestCase
- from django.test.client import Client
- from jsonpath_rw import parse
- from apps.web.utils import get_backend_by_role
- from apps.web.user.models import MyUser
- from apps.web.user.auth import end_user_login, end_user_login_session_key
- def pluck(response, field): return parse(field).find(json.loads(response.content))[0].value
- def pluck_description(response): return pluck(response, 'description')
- def pluck_dataList(response): return pluck(response, 'payload.dataList')
- def pluck_result(response): return pluck(response, 'result')
- def result_is_ok(response): return pluck_result(response) == 1
- def result_is_not_ok(response): return pluck_result(response) == 0
- def skip_if_no_django():
- """Raises a skip exception when no Django settings are available"""
- if not django_settings_is_configured():
- pytest.skip('no Django settings')
- def django_settings_is_configured():
- # Avoid importing Django if it has not yet been imported
- if not os.environ.get('DJANGO_SETTINGS_MODULE') \
- and 'django.conf' not in sys.modules:
- return False
- # If DJANGO_SETTINGS_MODULE is defined at this point, Django is assumed to
- # always be loaded.
- return True
- def get_django_version():
- return __import__('django').VERSION
- #: 由于项目运用MongoDB,需要hack Django 默认的TestCase/runner
- #: set TEST_RUNNER = 'washpayer.tests.MongoBasedTestRunner' to configs/base.py
- TEST_DB = 'testsuite'
- WECHAT_USER_AGENT = 'Mozilla/5.0 (Linux; Android 6.0; 1503-M02 Build/MRA58K) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/37.0.0.0 Mobile MQQBrowser/6.2 TBS/036558 Safari/537.36 MicroMessenger/6.3.25.861 NetType/WIFI Language/zh_CN'
- ALIPAY_USER_AGENT = 'Mozilla/5.0 (Linux; U; Android 7.1.1; zh-CN; OD103 Build/NMF26F) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/40.0.2214.89 UCBrowser/11.6.4.950 UWS/2.11.0.20 Mobile Safari/537.36 UCBS/2.11.0.20 Nebula AlipayDefined(nt:WIFI,ws:360|0|3.0) AliApp(AP/10.1.2.091816) AlipayClient/10.1.2.091816 Language/en useStatusBar/true'
- PC_USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.78 Safari/537.36'
- _running_test = False
- Url = str
- class RequestTestClient(Client):
- _current_user = None
- def get(self, path, data=None, follow=False, secure=False, **extra):
- if callable(path):
- path = path()
- assert isinstance(path, Url), 'path has to be a string'
- return super(RequestTestClient, self).get(path, data=None, follow=False, secure=False, **extra)
- def post(self, path, data=None, content_type='application/json',
- follow=False, secure=False, **extra):
- if callable(path):
- path = path()
- assert isinstance(path, Url), 'path has to be a string'
- return super(RequestTestClient, self).post(path, data=data, content_type=content_type,
- follow=follow, secure=secure, **extra)
- def _login(self, login_fn, user, user_agent=WECHAT_USER_AGENT):
- # Create a fake request to store login details.
- request = HttpRequest()
- request.META['HTTP_USER_AGENT'] = user_agent
- engine = import_module(settings.SESSION_ENGINE)
- if self.session:
- request.session = self.session
- request.user = get_user(request)
- else:
- request.session = engine.SessionStore()
- login_fn(request, user)
- # Save the session values.
- request.session.save()
- # Set the cookie to represent the session.
- session_cookie = settings.SESSION_COOKIE_NAME
- self.cookies[session_cookie] = request.session.session_key
- cookie_data = {
- 'max-age': None,
- 'path': '/',
- 'domain': settings.COOKIE_DOMAIN,
- 'secure': False,
- 'expires': None,
- }
- self.cookies[session_cookie].update(cookie_data)
- self.current_user = user
- return True
- def current_user(self, role=None):
- return self._current_user or self.login_as(role)
- def login_as(self, role, **credentials):
- backend = get_backend_by_role(role)()
- credentials['password'] = credentials['password']
- user = backend.authenticate(**credentials)
- assert user is not None, u'could not find user, cannot login, credentials=%s' % (json.dumps(credentials),)
- return self._login(login_fn=login, user=user)
- def login_as_dealer(self, **credentials):
- return self.login_as('dealer', **credentials)
- def login_as_agent(self, **credentials):
- return self.login_as('agent', **credentials)
- def login_as_manager(self, **credentials):
- return self.login_as('manager', **credentials)
- def login_as_super_manager(self, **credentials):
- return self.login_as('supermanager', **credentials)
- def login_as_endUser(self, user_agent=WECHAT_USER_AGENT, **credentials):
- user = MyUser.objects(**credentials).get()
- request = HttpRequest()
- request.META['HTTP_USER_AGENT'] = user_agent
- self._token = token = end_user_login(request, user)
- cookie_data = {
- 'value': token,
- 'max-age': None,
- 'path': '/',
- 'domain': settings.COOKIE_DOMAIN,
- 'secure': False,
- 'expires': None,
- }
- self.cookies[end_user_login_session_key] = cookie_data
- self.current_user = user
- return True
- def post_json(self, path, data=None, **kwargs):
- data = {} if data is None else data
- return self.post(path, json.dumps(data), content_type='application/json', **kwargs)
- class EndUserTestClient(RequestTestClient):
- def _get_headers(self):
- return {
- settings.JWT_AUTH_DOMAIN_DJANGO: settings.SERVICE_DOMAIN.USER,
- settings.JWT_TOKEN_DJANGO: getattr(self, '_token', None)
- }
- def get(self, path, data=None, follow=False, secure=False, **extra):
- headers = self._get_headers()
- extra.update(headers)
- return super(EndUserTestClient, self).get(path, data=None, follow=False, secure=False, **extra)
- def post(self, path, data=None, content_type='application/json',
- follow=False, secure=False, **extra):
- headers = self._get_headers()
- extra.update(headers)
- return super(EndUserTestClient, self).post(path, data=data, content_type=content_type,
- follow=follow, secure=secure, **extra)
- class WechatRequestTestClient(EndUserTestClient):
-
- def __init__(self, **defaults):
- super(WechatRequestTestClient, self).__init__(HTTP_USER_AGENT=WECHAT_USER_AGENT, **defaults)
- class AlipayRequestTestClient(EndUserTestClient):
- def __init__(self, **defaults):
- super(AlipayRequestTestClient, self).__init__(HTTP_USER_AGENT=ALIPAY_USER_AGENT, **defaults)
- class TestCase(_TestCase):
- client_class = RequestTestClient
- def _fixture_setup(self):
- pass
- def _fixture_teardown(self):
- pass
- class MongoBasedTestRunner(DiscoverRunner):
- def setup_databases(self, **kwargs):
- global _running_test
- _running_test = True
- db_name = TEST_DB
- connect(db_name)
- return db_name
- def teardown_databases(self, db_name, **kwargs):
- from pymongo import MongoClient
- client = MongoClient()
- client.drop_database(db_name)
- class APITestCase(TestCase):
- def setUp(self):
- self.client = requests
- class CookieBasedTestCase(TestCase):
- def setUp(self):
- self.cookie = SimpleCookie({})
- def get_views(functions):
- return filter(lambda f: 'request' in inspect.getargspec(f).args, functions)
|