base.py 8.4 KB


  1. # -*- coding: utf-8 -*-
  2. #!/usr/bin/env python
  3. """Core Unit Test Module"""
  4. import os
  5. os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'configs.systest')
  6. import sys
  7. import pytest
  8. import inspect
  9. import simplejson as json
  10. from Cookie import SimpleCookie
  11. from importlib import import_module
  12. import requests
  13. from mongoengine import connect
  14. from django.conf import settings
  15. from django.contrib.auth import get_user, logout, login
  16. from django.http import HttpRequest
  17. from django.test.runner import DiscoverRunner
  18. from django.test import TestCase as _TestCase
  19. from django.test.client import Client
  20. from jsonpath_rw import parse
  21. from apps.web.utils import get_backend_by_role
  22. from apps.web.user.models import MyUser
  23. from apps.web.user.auth import end_user_login, end_user_login_session_key
  24. def pluck(response, field): return parse(field).find(json.loads(response.content))[0].value
  25. def pluck_description(response): return pluck(response, 'description')
  26. def pluck_dataList(response): return pluck(response, 'payload.dataList')
  27. def pluck_result(response): return pluck(response, 'result')
  28. def result_is_ok(response): return pluck_result(response) == 1
  29. def result_is_not_ok(response): return pluck_result(response) == 0
  30. def skip_if_no_django():
  31. """Raises a skip exception when no Django settings are available"""
  32. if not django_settings_is_configured():
  33. pytest.skip('no Django settings')
  34. def django_settings_is_configured():
  35. # Avoid importing Django if it has not yet been imported
  36. if not os.environ.get('DJANGO_SETTINGS_MODULE') \
  37. and 'django.conf' not in sys.modules:
  38. return False
  39. # If DJANGO_SETTINGS_MODULE is defined at this point, Django is assumed to
  40. # always be loaded.
  41. return True
  42. def get_django_version():
  43. return __import__('django').VERSION
  44. #: 由于项目运用MongoDB,需要hack Django 默认的TestCase/runner
  45. #: set TEST_RUNNER = 'washpayer.tests.MongoBasedTestRunner' to configs/base.py
  46. TEST_DB = 'testsuite'
  47. WECHAT_USER_AGENT = 'Mozilla/5.0 (Linux; Android 6.0; 1503-M02 Build/MRA58K) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/37.0.0.0 Mobile MQQBrowser/6.2 TBS/036558 Safari/537.36 MicroMessenger/6.3.25.861 NetType/WIFI Language/zh_CN'
  48. ALIPAY_USER_AGENT = 'Mozilla/5.0 (Linux; U; Android 7.1.1; zh-CN; OD103 Build/NMF26F) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/40.0.2214.89 UCBrowser/11.6.4.950 UWS/2.11.0.20 Mobile Safari/537.36 UCBS/2.11.0.20 Nebula AlipayDefined(nt:WIFI,ws:360|0|3.0) AliApp(AP/10.1.2.091816) AlipayClient/10.1.2.091816 Language/en useStatusBar/true'
  49. PC_USER_AGENT = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/60.0.3112.78 Safari/537.36'
  50. _running_test = False
  51. Url = str
  52. class RequestTestClient(Client):
  53. _current_user = None
  54. def get(self, path, data=None, follow=False, secure=False, **extra):
  55. if callable(path):
  56. path = path()
  57. assert isinstance(path, Url), 'path has to be a string'
  58. return super(RequestTestClient, self).get(path, data=None, follow=False, secure=False, **extra)
  59. def post(self, path, data=None, content_type='application/json',
  60. follow=False, secure=False, **extra):
  61. if callable(path):
  62. path = path()
  63. assert isinstance(path, Url), 'path has to be a string'
  64. return super(RequestTestClient, self).post(path, data=data, content_type=content_type,
  65. follow=follow, secure=secure, **extra)
  66. def _login(self, login_fn, user, user_agent=WECHAT_USER_AGENT):
  67. # Create a fake request to store login details.
  68. request = HttpRequest()
  69. request.META['HTTP_USER_AGENT'] = user_agent
  70. engine = import_module(settings.SESSION_ENGINE)
  71. if self.session:
  72. request.session = self.session
  73. request.user = get_user(request)
  74. else:
  75. request.session = engine.SessionStore()
  76. login_fn(request, user)
  77. # Save the session values.
  78. request.session.save()
  79. # Set the cookie to represent the session.
  80. session_cookie = settings.SESSION_COOKIE_NAME
  81. self.cookies[session_cookie] = request.session.session_key
  82. cookie_data = {
  83. 'max-age': None,
  84. 'path': '/',
  85. 'domain': settings.COOKIE_DOMAIN,
  86. 'secure': False,
  87. 'expires': None,
  88. }
  89. self.cookies[session_cookie].update(cookie_data)
  90. self.current_user = user
  91. return True
  92. def current_user(self, role=None):
  93. return self._current_user or self.login_as(role)
  94. def login_as(self, role, **credentials):
  95. backend = get_backend_by_role(role)()
  96. credentials['password'] = credentials['password']
  97. user = backend.authenticate(**credentials)
  98. assert user is not None, u'could not find user, cannot login, credentials=%s' % (json.dumps(credentials),)
  99. return self._login(login_fn=login, user=user)
  100. def login_as_dealer(self, **credentials):
  101. return self.login_as('dealer', **credentials)
  102. def login_as_agent(self, **credentials):
  103. return self.login_as('agent', **credentials)
  104. def login_as_manager(self, **credentials):
  105. return self.login_as('manager', **credentials)
  106. def login_as_super_manager(self, **credentials):
  107. return self.login_as('supermanager', **credentials)
  108. def login_as_endUser(self, user_agent=WECHAT_USER_AGENT, **credentials):
  109. user = MyUser.objects(**credentials).get()
  110. request = HttpRequest()
  111. request.META['HTTP_USER_AGENT'] = user_agent
  112. self._token = token = end_user_login(request, user)
  113. cookie_data = {
  114. 'value': token,
  115. 'max-age': None,
  116. 'path': '/',
  117. 'domain': settings.COOKIE_DOMAIN,
  118. 'secure': False,
  119. 'expires': None,
  120. }
  121. self.cookies[end_user_login_session_key] = cookie_data
  122. self.current_user = user
  123. return True
  124. def post_json(self, path, data=None, **kwargs):
  125. data = {} if data is None else data
  126. return self.post(path, json.dumps(data), content_type='application/json', **kwargs)
  127. class EndUserTestClient(RequestTestClient):
  128. def _get_headers(self):
  129. return {
  130. settings.JWT_AUTH_DOMAIN_DJANGO: settings.SERVICE_DOMAIN.USER,
  131. settings.JWT_TOKEN_DJANGO: getattr(self, '_token', None)
  132. }
  133. def get(self, path, data=None, follow=False, secure=False, **extra):
  134. headers = self._get_headers()
  135. extra.update(headers)
  136. return super(EndUserTestClient, self).get(path, data=None, follow=False, secure=False, **extra)
  137. def post(self, path, data=None, content_type='application/json',
  138. follow=False, secure=False, **extra):
  139. headers = self._get_headers()
  140. extra.update(headers)
  141. return super(EndUserTestClient, self).post(path, data=data, content_type=content_type,
  142. follow=follow, secure=secure, **extra)
  143. class WechatRequestTestClient(EndUserTestClient):
  144. def __init__(self, **defaults):
  145. super(WechatRequestTestClient, self).__init__(HTTP_USER_AGENT=WECHAT_USER_AGENT, **defaults)
  146. class AlipayRequestTestClient(EndUserTestClient):
  147. def __init__(self, **defaults):
  148. super(AlipayRequestTestClient, self).__init__(HTTP_USER_AGENT=ALIPAY_USER_AGENT, **defaults)
  149. class TestCase(_TestCase):
  150. client_class = RequestTestClient
  151. def _fixture_setup(self):
  152. pass
  153. def _fixture_teardown(self):
  154. pass
  155. class MongoBasedTestRunner(DiscoverRunner):
  156. def setup_databases(self, **kwargs):
  157. global _running_test
  158. _running_test = True
  159. db_name = TEST_DB
  160. connect(db_name)
  161. return db_name
  162. def teardown_databases(self, db_name, **kwargs):
  163. from pymongo import MongoClient
  164. client = MongoClient()
  165. client.drop_database(db_name)
  166. class APITestCase(TestCase):
  167. def setUp(self):
  168. self.client = requests
  169. class CookieBasedTestCase(TestCase):
  170. def setUp(self):
  171. self.cookie = SimpleCookie({})
  172. def get_views(functions):
  173. return filter(lambda f: 'request' in inspect.getargspec(f).args, functions)