curl_httpclient_test.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # coding: utf-8
  2. from __future__ import absolute_import, division, print_function
  3. from hashlib import md5
  4. from tornado.escape import utf8
  5. from tornado.httpclient import HTTPRequest, HTTPClientError
  6. from tornado.locks import Event
  7. from tornado.stack_context import ExceptionStackContext
  8. from tornado.testing import AsyncHTTPTestCase, gen_test
  9. from tornado.test import httpclient_test
  10. from tornado.test.util import unittest, ignore_deprecation
  11. from tornado.web import Application, RequestHandler
  12. try:
  13. import pycurl # type: ignore
  14. except ImportError:
  15. pycurl = None
  16. if pycurl is not None:
  17. from tornado.curl_httpclient import CurlAsyncHTTPClient
  18. @unittest.skipIf(pycurl is None, "pycurl module not present")
  19. class CurlHTTPClientCommonTestCase(httpclient_test.HTTPClientCommonTestCase):
  20. def get_http_client(self):
  21. client = CurlAsyncHTTPClient(defaults=dict(allow_ipv6=False))
  22. # make sure AsyncHTTPClient magic doesn't give us the wrong class
  23. self.assertTrue(isinstance(client, CurlAsyncHTTPClient))
  24. return client
  25. class DigestAuthHandler(RequestHandler):
  26. def initialize(self, username, password):
  27. self.username = username
  28. self.password = password
  29. def get(self):
  30. realm = 'test'
  31. opaque = 'asdf'
  32. # Real implementations would use a random nonce.
  33. nonce = "1234"
  34. auth_header = self.request.headers.get('Authorization', None)
  35. if auth_header is not None:
  36. auth_mode, params = auth_header.split(' ', 1)
  37. assert auth_mode == 'Digest'
  38. param_dict = {}
  39. for pair in params.split(','):
  40. k, v = pair.strip().split('=', 1)
  41. if v[0] == '"' and v[-1] == '"':
  42. v = v[1:-1]
  43. param_dict[k] = v
  44. assert param_dict['realm'] == realm
  45. assert param_dict['opaque'] == opaque
  46. assert param_dict['nonce'] == nonce
  47. assert param_dict['username'] == self.username
  48. assert param_dict['uri'] == self.request.path
  49. h1 = md5(utf8('%s:%s:%s' % (self.username, realm, self.password))).hexdigest()
  50. h2 = md5(utf8('%s:%s' % (self.request.method,
  51. self.request.path))).hexdigest()
  52. digest = md5(utf8('%s:%s:%s' % (h1, nonce, h2))).hexdigest()
  53. if digest == param_dict['response']:
  54. self.write('ok')
  55. else:
  56. self.write('fail')
  57. else:
  58. self.set_status(401)
  59. self.set_header('WWW-Authenticate',
  60. 'Digest realm="%s", nonce="%s", opaque="%s"' %
  61. (realm, nonce, opaque))
  62. class CustomReasonHandler(RequestHandler):
  63. def get(self):
  64. self.set_status(200, "Custom reason")
  65. class CustomFailReasonHandler(RequestHandler):
  66. def get(self):
  67. self.set_status(400, "Custom reason")
  68. @unittest.skipIf(pycurl is None, "pycurl module not present")
  69. class CurlHTTPClientTestCase(AsyncHTTPTestCase):
  70. def setUp(self):
  71. super(CurlHTTPClientTestCase, self).setUp()
  72. self.http_client = self.create_client()
  73. def get_app(self):
  74. return Application([
  75. ('/digest', DigestAuthHandler, {'username': 'foo', 'password': 'bar'}),
  76. ('/digest_non_ascii', DigestAuthHandler, {'username': 'foo', 'password': 'barユ£'}),
  77. ('/custom_reason', CustomReasonHandler),
  78. ('/custom_fail_reason', CustomFailReasonHandler),
  79. ])
  80. def create_client(self, **kwargs):
  81. return CurlAsyncHTTPClient(force_instance=True,
  82. defaults=dict(allow_ipv6=False),
  83. **kwargs)
  84. @gen_test
  85. def test_prepare_curl_callback_stack_context(self):
  86. exc_info = []
  87. error_event = Event()
  88. def error_handler(typ, value, tb):
  89. exc_info.append((typ, value, tb))
  90. error_event.set()
  91. return True
  92. with ignore_deprecation():
  93. with ExceptionStackContext(error_handler):
  94. request = HTTPRequest(self.get_url('/custom_reason'),
  95. prepare_curl_callback=lambda curl: 1 / 0)
  96. yield [error_event.wait(), self.http_client.fetch(request)]
  97. self.assertEqual(1, len(exc_info))
  98. self.assertIs(exc_info[0][0], ZeroDivisionError)
  99. def test_digest_auth(self):
  100. response = self.fetch('/digest', auth_mode='digest',
  101. auth_username='foo', auth_password='bar')
  102. self.assertEqual(response.body, b'ok')
  103. def test_custom_reason(self):
  104. response = self.fetch('/custom_reason')
  105. self.assertEqual(response.reason, "Custom reason")
  106. def test_fail_custom_reason(self):
  107. response = self.fetch('/custom_fail_reason')
  108. self.assertEqual(str(response.error), "HTTP 400: Custom reason")
  109. def test_failed_setup(self):
  110. self.http_client = self.create_client(max_clients=1)
  111. for i in range(5):
  112. with ignore_deprecation():
  113. response = self.fetch(u'/ユニコード')
  114. self.assertIsNot(response.error, None)
  115. with self.assertRaises((UnicodeEncodeError, HTTPClientError)):
  116. # This raises UnicodeDecodeError on py3 and
  117. # HTTPClientError(404) on py2. The main motivation of
  118. # this test is to ensure that the UnicodeEncodeError
  119. # during the setup phase doesn't lead the request to
  120. # be dropped on the floor.
  121. response = self.fetch(u'/ユニコード', raise_error=True)
  122. def test_digest_auth_non_ascii(self):
  123. response = self.fetch('/digest_non_ascii', auth_mode='digest',
  124. auth_username='foo', auth_password='barユ£')
  125. self.assertEqual(response.body, b'ok')