wsgi_test.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. from __future__ import absolute_import, division, print_function
  2. from wsgiref.validate import validator
  3. from tornado.escape import json_decode
  4. from tornado.test.httpserver_test import TypeCheckHandler
  5. from tornado.test.util import ignore_deprecation
  6. from tornado.testing import AsyncHTTPTestCase
  7. from tornado.web import RequestHandler, Application
  8. from tornado.wsgi import WSGIApplication, WSGIContainer, WSGIAdapter
  9. from tornado.test import httpserver_test
  10. from tornado.test import web_test
  11. class WSGIContainerTest(AsyncHTTPTestCase):
  12. def wsgi_app(self, environ, start_response):
  13. status = "200 OK"
  14. response_headers = [("Content-Type", "text/plain")]
  15. start_response(status, response_headers)
  16. return [b"Hello world!"]
  17. def get_app(self):
  18. return WSGIContainer(validator(self.wsgi_app))
  19. def test_simple(self):
  20. response = self.fetch("/")
  21. self.assertEqual(response.body, b"Hello world!")
  22. class WSGIAdapterTest(AsyncHTTPTestCase):
  23. def get_app(self):
  24. class HelloHandler(RequestHandler):
  25. def get(self):
  26. self.write("Hello world!")
  27. class PathQuotingHandler(RequestHandler):
  28. def get(self, path):
  29. self.write(path)
  30. # It would be better to run the wsgiref server implementation in
  31. # another thread instead of using our own WSGIContainer, but this
  32. # fits better in our async testing framework and the wsgiref
  33. # validator should keep us honest
  34. with ignore_deprecation():
  35. return WSGIContainer(validator(WSGIAdapter(
  36. Application([
  37. ("/", HelloHandler),
  38. ("/path/(.*)", PathQuotingHandler),
  39. ("/typecheck", TypeCheckHandler),
  40. ]))))
  41. def test_simple(self):
  42. response = self.fetch("/")
  43. self.assertEqual(response.body, b"Hello world!")
  44. def test_path_quoting(self):
  45. response = self.fetch("/path/foo%20bar%C3%A9")
  46. self.assertEqual(response.body, u"foo bar\u00e9".encode("utf-8"))
  47. def test_types(self):
  48. headers = {"Cookie": "foo=bar"}
  49. response = self.fetch("/typecheck?foo=bar", headers=headers)
  50. data = json_decode(response.body)
  51. self.assertEqual(data, {})
  52. response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
  53. data = json_decode(response.body)
  54. self.assertEqual(data, {})
  55. # This is kind of hacky, but run some of the HTTPServer and web tests
  56. # through WSGIContainer and WSGIApplication to make sure everything
  57. # survives repeated disassembly and reassembly.
  58. class WSGIConnectionTest(httpserver_test.HTTPConnectionTest):
  59. def get_app(self):
  60. with ignore_deprecation():
  61. return WSGIContainer(validator(WSGIAdapter(Application(self.get_handlers()))))
  62. def wrap_web_tests_application():
  63. result = {}
  64. for cls in web_test.wsgi_safe_tests:
  65. def class_factory():
  66. class WSGIApplicationWrappedTest(cls): # type: ignore
  67. def setUp(self):
  68. self.warning_catcher = ignore_deprecation()
  69. self.warning_catcher.__enter__()
  70. super(WSGIApplicationWrappedTest, self).setUp()
  71. def tearDown(self):
  72. super(WSGIApplicationWrappedTest, self).tearDown()
  73. self.warning_catcher.__exit__(None, None, None)
  74. def get_app(self):
  75. self.app = WSGIApplication(self.get_handlers(),
  76. **self.get_app_kwargs())
  77. return WSGIContainer(validator(self.app))
  78. result["WSGIApplication_" + cls.__name__] = class_factory()
  79. return result
  80. globals().update(wrap_web_tests_application())
  81. def wrap_web_tests_adapter():
  82. result = {}
  83. for cls in web_test.wsgi_safe_tests:
  84. class WSGIAdapterWrappedTest(cls): # type: ignore
  85. def get_app(self):
  86. self.app = Application(self.get_handlers(),
  87. **self.get_app_kwargs())
  88. with ignore_deprecation():
  89. return WSGIContainer(validator(WSGIAdapter(self.app)))
  90. result["WSGIAdapter_" + cls.__name__] = WSGIAdapterWrappedTest
  91. return result
  92. globals().update(wrap_web_tests_adapter())