httputil_test.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. # -*- coding: utf-8 -*-
  2. from __future__ import absolute_import, division, print_function
  3. from tornado.httputil import (
  4. url_concat, parse_multipart_form_data, HTTPHeaders, format_timestamp,
  5. HTTPServerRequest, parse_request_start_line, parse_cookie, qs_to_qsl,
  6. HTTPInputError,
  7. )
  8. from tornado.escape import utf8, native_str
  9. from tornado.util import PY3
  10. from tornado.log import gen_log
  11. from tornado.testing import ExpectLog
  12. from tornado.test.util import unittest
  13. import copy
  14. import datetime
  15. import logging
  16. import pickle
  17. import time
  18. if PY3:
  19. import urllib.parse as urllib_parse
  20. else:
  21. import urlparse as urllib_parse
  22. class TestUrlConcat(unittest.TestCase):
  23. def test_url_concat_no_query_params(self):
  24. url = url_concat(
  25. "https://localhost/path",
  26. [('y', 'y'), ('z', 'z')],
  27. )
  28. self.assertEqual(url, "https://localhost/path?y=y&z=z")
  29. def test_url_concat_encode_args(self):
  30. url = url_concat(
  31. "https://localhost/path",
  32. [('y', '/y'), ('z', 'z')],
  33. )
  34. self.assertEqual(url, "https://localhost/path?y=%2Fy&z=z")
  35. def test_url_concat_trailing_q(self):
  36. url = url_concat(
  37. "https://localhost/path?",
  38. [('y', 'y'), ('z', 'z')],
  39. )
  40. self.assertEqual(url, "https://localhost/path?y=y&z=z")
  41. def test_url_concat_q_with_no_trailing_amp(self):
  42. url = url_concat(
  43. "https://localhost/path?x",
  44. [('y', 'y'), ('z', 'z')],
  45. )
  46. self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
  47. def test_url_concat_trailing_amp(self):
  48. url = url_concat(
  49. "https://localhost/path?x&",
  50. [('y', 'y'), ('z', 'z')],
  51. )
  52. self.assertEqual(url, "https://localhost/path?x=&y=y&z=z")
  53. def test_url_concat_mult_params(self):
  54. url = url_concat(
  55. "https://localhost/path?a=1&b=2",
  56. [('y', 'y'), ('z', 'z')],
  57. )
  58. self.assertEqual(url, "https://localhost/path?a=1&b=2&y=y&z=z")
  59. def test_url_concat_no_params(self):
  60. url = url_concat(
  61. "https://localhost/path?r=1&t=2",
  62. [],
  63. )
  64. self.assertEqual(url, "https://localhost/path?r=1&t=2")
  65. def test_url_concat_none_params(self):
  66. url = url_concat(
  67. "https://localhost/path?r=1&t=2",
  68. None,
  69. )
  70. self.assertEqual(url, "https://localhost/path?r=1&t=2")
  71. def test_url_concat_with_frag(self):
  72. url = url_concat(
  73. "https://localhost/path#tab",
  74. [('y', 'y')],
  75. )
  76. self.assertEqual(url, "https://localhost/path?y=y#tab")
  77. def test_url_concat_multi_same_params(self):
  78. url = url_concat(
  79. "https://localhost/path",
  80. [('y', 'y1'), ('y', 'y2')],
  81. )
  82. self.assertEqual(url, "https://localhost/path?y=y1&y=y2")
  83. def test_url_concat_multi_same_query_params(self):
  84. url = url_concat(
  85. "https://localhost/path?r=1&r=2",
  86. [('y', 'y')],
  87. )
  88. self.assertEqual(url, "https://localhost/path?r=1&r=2&y=y")
  89. def test_url_concat_dict_params(self):
  90. url = url_concat(
  91. "https://localhost/path",
  92. dict(y='y'),
  93. )
  94. self.assertEqual(url, "https://localhost/path?y=y")
  95. class QsParseTest(unittest.TestCase):
  96. def test_parsing(self):
  97. qsstring = "a=1&b=2&a=3"
  98. qs = urllib_parse.parse_qs(qsstring)
  99. qsl = list(qs_to_qsl(qs))
  100. self.assertIn(('a', '1'), qsl)
  101. self.assertIn(('a', '3'), qsl)
  102. self.assertIn(('b', '2'), qsl)
  103. class MultipartFormDataTest(unittest.TestCase):
  104. def test_file_upload(self):
  105. data = b"""\
  106. --1234
  107. Content-Disposition: form-data; name="files"; filename="ab.txt"
  108. Foo
  109. --1234--""".replace(b"\n", b"\r\n")
  110. args = {}
  111. files = {}
  112. parse_multipart_form_data(b"1234", data, args, files)
  113. file = files["files"][0]
  114. self.assertEqual(file["filename"], "ab.txt")
  115. self.assertEqual(file["body"], b"Foo")
  116. def test_unquoted_names(self):
  117. # quotes are optional unless special characters are present
  118. data = b"""\
  119. --1234
  120. Content-Disposition: form-data; name=files; filename=ab.txt
  121. Foo
  122. --1234--""".replace(b"\n", b"\r\n")
  123. args = {}
  124. files = {}
  125. parse_multipart_form_data(b"1234", data, args, files)
  126. file = files["files"][0]
  127. self.assertEqual(file["filename"], "ab.txt")
  128. self.assertEqual(file["body"], b"Foo")
  129. def test_special_filenames(self):
  130. filenames = ['a;b.txt',
  131. 'a"b.txt',
  132. 'a";b.txt',
  133. 'a;"b.txt',
  134. 'a";";.txt',
  135. 'a\\"b.txt',
  136. 'a\\b.txt',
  137. ]
  138. for filename in filenames:
  139. logging.debug("trying filename %r", filename)
  140. data = """\
  141. --1234
  142. Content-Disposition: form-data; name="files"; filename="%s"
  143. Foo
  144. --1234--""" % filename.replace('\\', '\\\\').replace('"', '\\"')
  145. data = utf8(data.replace("\n", "\r\n"))
  146. args = {}
  147. files = {}
  148. parse_multipart_form_data(b"1234", data, args, files)
  149. file = files["files"][0]
  150. self.assertEqual(file["filename"], filename)
  151. self.assertEqual(file["body"], b"Foo")
  152. def test_non_ascii_filename(self):
  153. data = b"""\
  154. --1234
  155. Content-Disposition: form-data; name="files"; filename="ab.txt"; filename*=UTF-8''%C3%A1b.txt
  156. Foo
  157. --1234--""".replace(b"\n", b"\r\n")
  158. args = {}
  159. files = {}
  160. parse_multipart_form_data(b"1234", data, args, files)
  161. file = files["files"][0]
  162. self.assertEqual(file["filename"], u"áb.txt")
  163. self.assertEqual(file["body"], b"Foo")
  164. def test_boundary_starts_and_ends_with_quotes(self):
  165. data = b'''\
  166. --1234
  167. Content-Disposition: form-data; name="files"; filename="ab.txt"
  168. Foo
  169. --1234--'''.replace(b"\n", b"\r\n")
  170. args = {}
  171. files = {}
  172. parse_multipart_form_data(b'"1234"', data, args, files)
  173. file = files["files"][0]
  174. self.assertEqual(file["filename"], "ab.txt")
  175. self.assertEqual(file["body"], b"Foo")
  176. def test_missing_headers(self):
  177. data = b'''\
  178. --1234
  179. Foo
  180. --1234--'''.replace(b"\n", b"\r\n")
  181. args = {}
  182. files = {}
  183. with ExpectLog(gen_log, "multipart/form-data missing headers"):
  184. parse_multipart_form_data(b"1234", data, args, files)
  185. self.assertEqual(files, {})
  186. def test_invalid_content_disposition(self):
  187. data = b'''\
  188. --1234
  189. Content-Disposition: invalid; name="files"; filename="ab.txt"
  190. Foo
  191. --1234--'''.replace(b"\n", b"\r\n")
  192. args = {}
  193. files = {}
  194. with ExpectLog(gen_log, "Invalid multipart/form-data"):
  195. parse_multipart_form_data(b"1234", data, args, files)
  196. self.assertEqual(files, {})
  197. def test_line_does_not_end_with_correct_line_break(self):
  198. data = b'''\
  199. --1234
  200. Content-Disposition: form-data; name="files"; filename="ab.txt"
  201. Foo--1234--'''.replace(b"\n", b"\r\n")
  202. args = {}
  203. files = {}
  204. with ExpectLog(gen_log, "Invalid multipart/form-data"):
  205. parse_multipart_form_data(b"1234", data, args, files)
  206. self.assertEqual(files, {})
  207. def test_content_disposition_header_without_name_parameter(self):
  208. data = b"""\
  209. --1234
  210. Content-Disposition: form-data; filename="ab.txt"
  211. Foo
  212. --1234--""".replace(b"\n", b"\r\n")
  213. args = {}
  214. files = {}
  215. with ExpectLog(gen_log, "multipart/form-data value missing name"):
  216. parse_multipart_form_data(b"1234", data, args, files)
  217. self.assertEqual(files, {})
  218. def test_data_after_final_boundary(self):
  219. # The spec requires that data after the final boundary be ignored.
  220. # http://www.w3.org/Protocols/rfc1341/7_2_Multipart.html
  221. # In practice, some libraries include an extra CRLF after the boundary.
  222. data = b"""\
  223. --1234
  224. Content-Disposition: form-data; name="files"; filename="ab.txt"
  225. Foo
  226. --1234--
  227. """.replace(b"\n", b"\r\n")
  228. args = {}
  229. files = {}
  230. parse_multipart_form_data(b"1234", data, args, files)
  231. file = files["files"][0]
  232. self.assertEqual(file["filename"], "ab.txt")
  233. self.assertEqual(file["body"], b"Foo")
  234. class HTTPHeadersTest(unittest.TestCase):
  235. def test_multi_line(self):
  236. # Lines beginning with whitespace are appended to the previous line
  237. # with any leading whitespace replaced by a single space.
  238. # Note that while multi-line headers are a part of the HTTP spec,
  239. # their use is strongly discouraged.
  240. data = """\
  241. Foo: bar
  242. baz
  243. Asdf: qwer
  244. \tzxcv
  245. Foo: even
  246. more
  247. lines
  248. """.replace("\n", "\r\n")
  249. headers = HTTPHeaders.parse(data)
  250. self.assertEqual(headers["asdf"], "qwer zxcv")
  251. self.assertEqual(headers.get_list("asdf"), ["qwer zxcv"])
  252. self.assertEqual(headers["Foo"], "bar baz,even more lines")
  253. self.assertEqual(headers.get_list("foo"), ["bar baz", "even more lines"])
  254. self.assertEqual(sorted(list(headers.get_all())),
  255. [("Asdf", "qwer zxcv"),
  256. ("Foo", "bar baz"),
  257. ("Foo", "even more lines")])
  258. def test_malformed_continuation(self):
  259. # If the first line starts with whitespace, it's a
  260. # continuation line with nothing to continue, so reject it
  261. # (with a proper error).
  262. data = " Foo: bar"
  263. self.assertRaises(HTTPInputError, HTTPHeaders.parse, data)
  264. def test_unicode_newlines(self):
  265. # Ensure that only \r\n is recognized as a header separator, and not
  266. # the other newline-like unicode characters.
  267. # Characters that are likely to be problematic can be found in
  268. # http://unicode.org/standard/reports/tr13/tr13-5.html
  269. # and cpython's unicodeobject.c (which defines the implementation
  270. # of unicode_type.splitlines(), and uses a different list than TR13).
  271. newlines = [
  272. u'\u001b', # VERTICAL TAB
  273. u'\u001c', # FILE SEPARATOR
  274. u'\u001d', # GROUP SEPARATOR
  275. u'\u001e', # RECORD SEPARATOR
  276. u'\u0085', # NEXT LINE
  277. u'\u2028', # LINE SEPARATOR
  278. u'\u2029', # PARAGRAPH SEPARATOR
  279. ]
  280. for newline in newlines:
  281. # Try the utf8 and latin1 representations of each newline
  282. for encoding in ['utf8', 'latin1']:
  283. try:
  284. try:
  285. encoded = newline.encode(encoding)
  286. except UnicodeEncodeError:
  287. # Some chars cannot be represented in latin1
  288. continue
  289. data = b'Cookie: foo=' + encoded + b'bar'
  290. # parse() wants a native_str, so decode through latin1
  291. # in the same way the real parser does.
  292. headers = HTTPHeaders.parse(
  293. native_str(data.decode('latin1')))
  294. expected = [('Cookie', 'foo=' +
  295. native_str(encoded.decode('latin1')) + 'bar')]
  296. self.assertEqual(
  297. expected, list(headers.get_all()))
  298. except Exception:
  299. gen_log.warning("failed while trying %r in %s",
  300. newline, encoding)
  301. raise
  302. def test_optional_cr(self):
  303. # Both CRLF and LF should be accepted as separators. CR should not be
  304. # part of the data when followed by LF, but it is a normal char
  305. # otherwise (or should bare CR be an error?)
  306. headers = HTTPHeaders.parse(
  307. 'CRLF: crlf\r\nLF: lf\nCR: cr\rMore: more\r\n')
  308. self.assertEqual(sorted(headers.get_all()),
  309. [('Cr', 'cr\rMore: more'),
  310. ('Crlf', 'crlf'),
  311. ('Lf', 'lf'),
  312. ])
  313. def test_copy(self):
  314. all_pairs = [('A', '1'), ('A', '2'), ('B', 'c')]
  315. h1 = HTTPHeaders()
  316. for k, v in all_pairs:
  317. h1.add(k, v)
  318. h2 = h1.copy()
  319. h3 = copy.copy(h1)
  320. h4 = copy.deepcopy(h1)
  321. for headers in [h1, h2, h3, h4]:
  322. # All the copies are identical, no matter how they were
  323. # constructed.
  324. self.assertEqual(list(sorted(headers.get_all())), all_pairs)
  325. for headers in [h2, h3, h4]:
  326. # Neither the dict or its member lists are reused.
  327. self.assertIsNot(headers, h1)
  328. self.assertIsNot(headers.get_list('A'), h1.get_list('A'))
  329. def test_pickle_roundtrip(self):
  330. headers = HTTPHeaders()
  331. headers.add('Set-Cookie', 'a=b')
  332. headers.add('Set-Cookie', 'c=d')
  333. headers.add('Content-Type', 'text/html')
  334. pickled = pickle.dumps(headers)
  335. unpickled = pickle.loads(pickled)
  336. self.assertEqual(sorted(headers.get_all()), sorted(unpickled.get_all()))
  337. self.assertEqual(sorted(headers.items()), sorted(unpickled.items()))
  338. def test_setdefault(self):
  339. headers = HTTPHeaders()
  340. headers['foo'] = 'bar'
  341. # If a value is present, setdefault returns it without changes.
  342. self.assertEqual(headers.setdefault('foo', 'baz'), 'bar')
  343. self.assertEqual(headers['foo'], 'bar')
  344. # If a value is not present, setdefault sets it for future use.
  345. self.assertEqual(headers.setdefault('quux', 'xyzzy'), 'xyzzy')
  346. self.assertEqual(headers['quux'], 'xyzzy')
  347. self.assertEqual(sorted(headers.get_all()), [('Foo', 'bar'), ('Quux', 'xyzzy')])
  348. def test_string(self):
  349. headers = HTTPHeaders()
  350. headers.add("Foo", "1")
  351. headers.add("Foo", "2")
  352. headers.add("Foo", "3")
  353. headers2 = HTTPHeaders.parse(str(headers))
  354. self.assertEquals(headers, headers2)
  355. class FormatTimestampTest(unittest.TestCase):
  356. # Make sure that all the input types are supported.
  357. TIMESTAMP = 1359312200.503611
  358. EXPECTED = 'Sun, 27 Jan 2013 18:43:20 GMT'
  359. def check(self, value):
  360. self.assertEqual(format_timestamp(value), self.EXPECTED)
  361. def test_unix_time_float(self):
  362. self.check(self.TIMESTAMP)
  363. def test_unix_time_int(self):
  364. self.check(int(self.TIMESTAMP))
  365. def test_struct_time(self):
  366. self.check(time.gmtime(self.TIMESTAMP))
  367. def test_time_tuple(self):
  368. tup = tuple(time.gmtime(self.TIMESTAMP))
  369. self.assertEqual(9, len(tup))
  370. self.check(tup)
  371. def test_datetime(self):
  372. self.check(datetime.datetime.utcfromtimestamp(self.TIMESTAMP))
  373. # HTTPServerRequest is mainly tested incidentally to the server itself,
  374. # but this tests the parts of the class that can be tested in isolation.
  375. class HTTPServerRequestTest(unittest.TestCase):
  376. def test_default_constructor(self):
  377. # All parameters are formally optional, but uri is required
  378. # (and has been for some time). This test ensures that no
  379. # more required parameters slip in.
  380. HTTPServerRequest(uri='/')
  381. def test_body_is_a_byte_string(self):
  382. requets = HTTPServerRequest(uri='/')
  383. self.assertIsInstance(requets.body, bytes)
  384. def test_repr_does_not_contain_headers(self):
  385. request = HTTPServerRequest(uri='/', headers={'Canary': 'Coal Mine'})
  386. self.assertTrue('Canary' not in repr(request))
  387. class ParseRequestStartLineTest(unittest.TestCase):
  388. METHOD = "GET"
  389. PATH = "/foo"
  390. VERSION = "HTTP/1.1"
  391. def test_parse_request_start_line(self):
  392. start_line = " ".join([self.METHOD, self.PATH, self.VERSION])
  393. parsed_start_line = parse_request_start_line(start_line)
  394. self.assertEqual(parsed_start_line.method, self.METHOD)
  395. self.assertEqual(parsed_start_line.path, self.PATH)
  396. self.assertEqual(parsed_start_line.version, self.VERSION)
  397. class ParseCookieTest(unittest.TestCase):
  398. # These tests copied from Django:
  399. # https://github.com/django/django/pull/6277/commits/da810901ada1cae9fc1f018f879f11a7fb467b28
  400. def test_python_cookies(self):
  401. """
  402. Test cases copied from Python's Lib/test/test_http_cookies.py
  403. """
  404. self.assertEqual(parse_cookie('chips=ahoy; vienna=finger'),
  405. {'chips': 'ahoy', 'vienna': 'finger'})
  406. # Here parse_cookie() differs from Python's cookie parsing in that it
  407. # treats all semicolons as delimiters, even within quotes.
  408. self.assertEqual(
  409. parse_cookie('keebler="E=mc2; L=\\"Loves\\"; fudge=\\012;"'),
  410. {'keebler': '"E=mc2', 'L': '\\"Loves\\"', 'fudge': '\\012', '': '"'}
  411. )
  412. # Illegal cookies that have an '=' char in an unquoted value.
  413. self.assertEqual(parse_cookie('keebler=E=mc2'), {'keebler': 'E=mc2'})
  414. # Cookies with ':' character in their name.
  415. self.assertEqual(parse_cookie('key:term=value:term'), {'key:term': 'value:term'})
  416. # Cookies with '[' and ']'.
  417. self.assertEqual(parse_cookie('a=b; c=[; d=r; f=h'),
  418. {'a': 'b', 'c': '[', 'd': 'r', 'f': 'h'})
  419. def test_cookie_edgecases(self):
  420. # Cookies that RFC6265 allows.
  421. self.assertEqual(parse_cookie('a=b; Domain=example.com'),
  422. {'a': 'b', 'Domain': 'example.com'})
  423. # parse_cookie() has historically kept only the last cookie with the
  424. # same name.
  425. self.assertEqual(parse_cookie('a=b; h=i; a=c'), {'a': 'c', 'h': 'i'})
  426. def test_invalid_cookies(self):
  427. """
  428. Cookie strings that go against RFC6265 but browsers will send if set
  429. via document.cookie.
  430. """
  431. # Chunks without an equals sign appear as unnamed values per
  432. # https://bugzilla.mozilla.org/show_bug.cgi?id=169091
  433. self.assertIn('django_language',
  434. parse_cookie('abc=def; unnamed; django_language=en').keys())
  435. # Even a double quote may be an unamed value.
  436. self.assertEqual(parse_cookie('a=b; "; c=d'), {'a': 'b', '': '"', 'c': 'd'})
  437. # Spaces in names and values, and an equals sign in values.
  438. self.assertEqual(parse_cookie('a b c=d e = f; gh=i'), {'a b c': 'd e = f', 'gh': 'i'})
  439. # More characters the spec forbids.
  440. self.assertEqual(parse_cookie('a b,c<>@:/[]?{}=d " =e,f g'),
  441. {'a b,c<>@:/[]?{}': 'd " =e,f g'})
  442. # Unicode characters. The spec only allows ASCII.
  443. self.assertEqual(parse_cookie('saint=André Bessette'),
  444. {'saint': native_str('André Bessette')})
  445. # Browsers don't send extra whitespace or semicolons in Cookie headers,
  446. # but parse_cookie() should parse whitespace the same way
  447. # document.cookie parses whitespace.
  448. self.assertEqual(parse_cookie(' = b ; ; = ; c = ; '), {'': 'b', 'c': ''})