web_test.py 114 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967
  1. from __future__ import absolute_import, division, print_function
  2. from tornado.concurrent import Future
  3. from tornado import gen
  4. from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring # noqa: E501
  5. from tornado.httpclient import HTTPClientError
  6. from tornado.httputil import format_timestamp
  7. from tornado.ioloop import IOLoop
  8. from tornado.iostream import IOStream
  9. from tornado import locale
  10. from tornado.locks import Event
  11. from tornado.log import app_log, gen_log
  12. from tornado.simple_httpclient import SimpleAsyncHTTPClient
  13. from tornado.template import DictLoader
  14. from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
  15. from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation
  16. from tornado.util import ObjectDict, unicode_type, timedelta_to_seconds, PY3
  17. from tornado.web import (
  18. Application, RequestHandler, StaticFileHandler, RedirectHandler as WebRedirectHandler,
  19. HTTPError, MissingArgumentError, ErrorHandler, authenticated, asynchronous, url,
  20. _create_signature_v1, create_signed_value, decode_signed_value, get_signature_key_version,
  21. UIModule, Finish, stream_request_body, removeslash, addslash, GZipContentEncoding,
  22. )
  23. import binascii
  24. import contextlib
  25. import copy
  26. import datetime
  27. import email.utils
  28. import gzip
  29. from io import BytesIO
  30. import itertools
  31. import logging
  32. import os
  33. import re
  34. import socket
  35. if PY3:
  36. import urllib.parse as urllib_parse # py3
  37. else:
  38. import urllib as urllib_parse # py2
  39. wsgi_safe_tests = []
  40. def relpath(*a):
  41. return os.path.join(os.path.dirname(__file__), *a)
  42. def wsgi_safe(cls):
  43. wsgi_safe_tests.append(cls)
  44. return cls
  45. class WebTestCase(AsyncHTTPTestCase):
  46. """Base class for web tests that also supports WSGI mode.
  47. Override get_handlers and get_app_kwargs instead of get_app.
  48. Append to wsgi_safe to have it run in wsgi_test as well.
  49. """
  50. def get_app(self):
  51. self.app = Application(self.get_handlers(), **self.get_app_kwargs())
  52. return self.app
  53. def get_handlers(self):
  54. raise NotImplementedError()
  55. def get_app_kwargs(self):
  56. return {}
  57. class SimpleHandlerTestCase(WebTestCase):
  58. """Simplified base class for tests that work with a single handler class.
  59. To use, define a nested class named ``Handler``.
  60. """
  61. def get_handlers(self):
  62. return [('/', self.Handler)]
  63. class HelloHandler(RequestHandler):
  64. def get(self):
  65. self.write('hello')
  66. class CookieTestRequestHandler(RequestHandler):
  67. # stub out enough methods to make the secure_cookie functions work
  68. def __init__(self, cookie_secret='0123456789', key_version=None):
  69. # don't call super.__init__
  70. self._cookies = {}
  71. if key_version is None:
  72. self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
  73. else:
  74. self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret,
  75. key_version=key_version))
  76. def get_cookie(self, name):
  77. return self._cookies.get(name)
  78. def set_cookie(self, name, value, expires_days=None):
  79. self._cookies[name] = value
  80. # See SignedValueTest below for more.
  81. class SecureCookieV1Test(unittest.TestCase):
  82. def test_round_trip(self):
  83. handler = CookieTestRequestHandler()
  84. handler.set_secure_cookie('foo', b'bar', version=1)
  85. self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
  86. b'bar')
  87. def test_cookie_tampering_future_timestamp(self):
  88. handler = CookieTestRequestHandler()
  89. # this string base64-encodes to '12345678'
  90. handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
  91. version=1)
  92. cookie = handler._cookies['foo']
  93. match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
  94. self.assertTrue(match)
  95. timestamp = match.group(1)
  96. sig = match.group(2)
  97. self.assertEqual(
  98. _create_signature_v1(handler.application.settings["cookie_secret"],
  99. 'foo', '12345678', timestamp),
  100. sig)
  101. # shifting digits from payload to timestamp doesn't alter signature
  102. # (this is not desirable behavior, just confirming that that's how it
  103. # works)
  104. self.assertEqual(
  105. _create_signature_v1(handler.application.settings["cookie_secret"],
  106. 'foo', '1234', b'5678' + timestamp),
  107. sig)
  108. # tamper with the cookie
  109. handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
  110. to_basestring(timestamp), to_basestring(sig)))
  111. # it gets rejected
  112. with ExpectLog(gen_log, "Cookie timestamp in future"):
  113. self.assertTrue(
  114. handler.get_secure_cookie('foo', min_version=1) is None)
  115. def test_arbitrary_bytes(self):
  116. # Secure cookies accept arbitrary data (which is base64 encoded).
  117. # Note that normal cookies accept only a subset of ascii.
  118. handler = CookieTestRequestHandler()
  119. handler.set_secure_cookie('foo', b'\xe9', version=1)
  120. self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
  121. # See SignedValueTest below for more.
  122. class SecureCookieV2Test(unittest.TestCase):
  123. KEY_VERSIONS = {
  124. 0: 'ajklasdf0ojaisdf',
  125. 1: 'aslkjasaolwkjsdf'
  126. }
  127. def test_round_trip(self):
  128. handler = CookieTestRequestHandler()
  129. handler.set_secure_cookie('foo', b'bar', version=2)
  130. self.assertEqual(handler.get_secure_cookie('foo', min_version=2), b'bar')
  131. def test_key_version_roundtrip(self):
  132. handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
  133. key_version=0)
  134. handler.set_secure_cookie('foo', b'bar')
  135. self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
  136. def test_key_version_roundtrip_differing_version(self):
  137. handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
  138. key_version=1)
  139. handler.set_secure_cookie('foo', b'bar')
  140. self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
  141. def test_key_version_increment_version(self):
  142. handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
  143. key_version=0)
  144. handler.set_secure_cookie('foo', b'bar')
  145. new_handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
  146. key_version=1)
  147. new_handler._cookies = handler._cookies
  148. self.assertEqual(new_handler.get_secure_cookie('foo'), b'bar')
  149. def test_key_version_invalidate_version(self):
  150. handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
  151. key_version=0)
  152. handler.set_secure_cookie('foo', b'bar')
  153. new_key_versions = self.KEY_VERSIONS.copy()
  154. new_key_versions.pop(0)
  155. new_handler = CookieTestRequestHandler(cookie_secret=new_key_versions,
  156. key_version=1)
  157. new_handler._cookies = handler._cookies
  158. self.assertEqual(new_handler.get_secure_cookie('foo'), None)
  159. class FinalReturnTest(WebTestCase):
  160. def get_handlers(self):
  161. test = self
  162. class FinishHandler(RequestHandler):
  163. @gen.coroutine
  164. def get(self):
  165. test.final_return = self.finish()
  166. yield test.final_return
  167. class RenderHandler(RequestHandler):
  168. def create_template_loader(self, path):
  169. return DictLoader({'foo.html': 'hi'})
  170. @gen.coroutine
  171. def get(self):
  172. test.final_return = self.render('foo.html')
  173. return [("/finish", FinishHandler),
  174. ("/render", RenderHandler)]
  175. def get_app_kwargs(self):
  176. return dict(template_path='FinalReturnTest')
  177. def test_finish_method_return_future(self):
  178. response = self.fetch(self.get_url('/finish'))
  179. self.assertEqual(response.code, 200)
  180. self.assertIsInstance(self.final_return, Future)
  181. self.assertTrue(self.final_return.done())
  182. def test_render_method_return_future(self):
  183. response = self.fetch(self.get_url('/render'))
  184. self.assertEqual(response.code, 200)
  185. self.assertIsInstance(self.final_return, Future)
  186. class CookieTest(WebTestCase):
  187. def get_handlers(self):
  188. class SetCookieHandler(RequestHandler):
  189. def get(self):
  190. # Try setting cookies with different argument types
  191. # to ensure that everything gets encoded correctly
  192. self.set_cookie("str", "asdf")
  193. self.set_cookie("unicode", u"qwer")
  194. self.set_cookie("bytes", b"zxcv")
  195. class GetCookieHandler(RequestHandler):
  196. def get(self):
  197. self.write(self.get_cookie("foo", "default"))
  198. class SetCookieDomainHandler(RequestHandler):
  199. def get(self):
  200. # unicode domain and path arguments shouldn't break things
  201. # either (see bug #285)
  202. self.set_cookie("unicode_args", "blah", domain=u"foo.com",
  203. path=u"/foo")
  204. class SetCookieSpecialCharHandler(RequestHandler):
  205. def get(self):
  206. self.set_cookie("equals", "a=b")
  207. self.set_cookie("semicolon", "a;b")
  208. self.set_cookie("quote", 'a"b')
  209. class SetCookieOverwriteHandler(RequestHandler):
  210. def get(self):
  211. self.set_cookie("a", "b", domain="example.com")
  212. self.set_cookie("c", "d", domain="example.com")
  213. # A second call with the same name clobbers the first.
  214. # Attributes from the first call are not carried over.
  215. self.set_cookie("a", "e")
  216. class SetCookieMaxAgeHandler(RequestHandler):
  217. def get(self):
  218. self.set_cookie("foo", "bar", max_age=10)
  219. class SetCookieExpiresDaysHandler(RequestHandler):
  220. def get(self):
  221. self.set_cookie("foo", "bar", expires_days=10)
  222. class SetCookieFalsyFlags(RequestHandler):
  223. def get(self):
  224. self.set_cookie("a", "1", secure=True)
  225. self.set_cookie("b", "1", secure=False)
  226. self.set_cookie("c", "1", httponly=True)
  227. self.set_cookie("d", "1", httponly=False)
  228. return [("/set", SetCookieHandler),
  229. ("/get", GetCookieHandler),
  230. ("/set_domain", SetCookieDomainHandler),
  231. ("/special_char", SetCookieSpecialCharHandler),
  232. ("/set_overwrite", SetCookieOverwriteHandler),
  233. ("/set_max_age", SetCookieMaxAgeHandler),
  234. ("/set_expires_days", SetCookieExpiresDaysHandler),
  235. ("/set_falsy_flags", SetCookieFalsyFlags)
  236. ]
  237. def test_set_cookie(self):
  238. response = self.fetch("/set")
  239. self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
  240. ["bytes=zxcv; Path=/",
  241. "str=asdf; Path=/",
  242. "unicode=qwer; Path=/",
  243. ])
  244. def test_get_cookie(self):
  245. response = self.fetch("/get", headers={"Cookie": "foo=bar"})
  246. self.assertEqual(response.body, b"bar")
  247. response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
  248. self.assertEqual(response.body, b"bar")
  249. response = self.fetch("/get", headers={"Cookie": "/=exception;"})
  250. self.assertEqual(response.body, b"default")
  251. def test_set_cookie_domain(self):
  252. response = self.fetch("/set_domain")
  253. self.assertEqual(response.headers.get_list("Set-Cookie"),
  254. ["unicode_args=blah; Domain=foo.com; Path=/foo"])
  255. def test_cookie_special_char(self):
  256. response = self.fetch("/special_char")
  257. headers = sorted(response.headers.get_list("Set-Cookie"))
  258. self.assertEqual(len(headers), 3)
  259. self.assertEqual(headers[0], 'equals="a=b"; Path=/')
  260. self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
  261. # python 2.7 octal-escapes the semicolon; older versions leave it alone
  262. self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
  263. 'semicolon="a\\073b"; Path=/'),
  264. headers[2])
  265. data = [('foo=a=b', 'a=b'),
  266. ('foo="a=b"', 'a=b'),
  267. ('foo="a;b"', '"a'), # even quoted, ";" is a delimiter
  268. ('foo=a\\073b', 'a\\073b'), # escapes only decoded in quotes
  269. ('foo="a\\073b"', 'a;b'),
  270. ('foo="a\\"b"', 'a"b'),
  271. ]
  272. for header, expected in data:
  273. logging.debug("trying %r", header)
  274. response = self.fetch("/get", headers={"Cookie": header})
  275. self.assertEqual(response.body, utf8(expected))
  276. def test_set_cookie_overwrite(self):
  277. response = self.fetch("/set_overwrite")
  278. headers = response.headers.get_list("Set-Cookie")
  279. self.assertEqual(sorted(headers),
  280. ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
  281. def test_set_cookie_max_age(self):
  282. response = self.fetch("/set_max_age")
  283. headers = response.headers.get_list("Set-Cookie")
  284. self.assertEqual(sorted(headers),
  285. ["foo=bar; Max-Age=10; Path=/"])
  286. def test_set_cookie_expires_days(self):
  287. response = self.fetch("/set_expires_days")
  288. header = response.headers.get("Set-Cookie")
  289. match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
  290. self.assertIsNotNone(match)
  291. expires = datetime.datetime.utcnow() + datetime.timedelta(days=10)
  292. header_expires = datetime.datetime(
  293. *email.utils.parsedate(match.groupdict()["expires"])[:6])
  294. self.assertTrue(abs(timedelta_to_seconds(expires - header_expires)) < 10)
  295. def test_set_cookie_false_flags(self):
  296. response = self.fetch("/set_falsy_flags")
  297. headers = sorted(response.headers.get_list("Set-Cookie"))
  298. # The secure and httponly headers are capitalized in py35 and
  299. # lowercase in older versions.
  300. self.assertEqual(headers[0].lower(), 'a=1; path=/; secure')
  301. self.assertEqual(headers[1].lower(), 'b=1; path=/')
  302. self.assertEqual(headers[2].lower(), 'c=1; httponly; path=/')
  303. self.assertEqual(headers[3].lower(), 'd=1; path=/')
  304. class AuthRedirectRequestHandler(RequestHandler):
  305. def initialize(self, login_url):
  306. self.login_url = login_url
  307. def get_login_url(self):
  308. return self.login_url
  309. @authenticated
  310. def get(self):
  311. # we'll never actually get here because the test doesn't follow redirects
  312. self.send_error(500)
  313. class AuthRedirectTest(WebTestCase):
  314. def get_handlers(self):
  315. return [('/relative', AuthRedirectRequestHandler,
  316. dict(login_url='/login')),
  317. ('/absolute', AuthRedirectRequestHandler,
  318. dict(login_url='http://example.com/login'))]
  319. def test_relative_auth_redirect(self):
  320. response = self.fetch(self.get_url('/relative'),
  321. follow_redirects=False)
  322. self.assertEqual(response.code, 302)
  323. self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
  324. def test_absolute_auth_redirect(self):
  325. response = self.fetch(self.get_url('/absolute'),
  326. follow_redirects=False)
  327. self.assertEqual(response.code, 302)
  328. self.assertTrue(re.match(
  329. 'http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute',
  330. response.headers['Location']), response.headers['Location'])
  331. class ConnectionCloseHandler(RequestHandler):
  332. def initialize(self, test):
  333. self.test = test
  334. @gen.coroutine
  335. def get(self):
  336. self.test.on_handler_waiting()
  337. never_finish = Event()
  338. yield never_finish.wait()
  339. def on_connection_close(self):
  340. self.test.on_connection_close()
  341. class ConnectionCloseTest(WebTestCase):
  342. def get_handlers(self):
  343. return [('/', ConnectionCloseHandler, dict(test=self))]
  344. def test_connection_close(self):
  345. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  346. s.connect(("127.0.0.1", self.get_http_port()))
  347. self.stream = IOStream(s)
  348. self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
  349. self.wait()
  350. def on_handler_waiting(self):
  351. logging.debug('handler waiting')
  352. self.stream.close()
  353. def on_connection_close(self):
  354. logging.debug('connection closed')
  355. self.stop()
  356. class EchoHandler(RequestHandler):
  357. def get(self, *path_args):
  358. # Type checks: web.py interfaces convert argument values to
  359. # unicode strings (by default, but see also decode_argument).
  360. # In httpserver.py (i.e. self.request.arguments), they're left
  361. # as bytes. Keys are always native strings.
  362. for key in self.request.arguments:
  363. if type(key) != str:
  364. raise Exception("incorrect type for key: %r" % type(key))
  365. for value in self.request.arguments[key]:
  366. if type(value) != bytes:
  367. raise Exception("incorrect type for value: %r" %
  368. type(value))
  369. for value in self.get_arguments(key):
  370. if type(value) != unicode_type:
  371. raise Exception("incorrect type for value: %r" %
  372. type(value))
  373. for arg in path_args:
  374. if type(arg) != unicode_type:
  375. raise Exception("incorrect type for path arg: %r" % type(arg))
  376. self.write(dict(path=self.request.path,
  377. path_args=path_args,
  378. args=recursive_unicode(self.request.arguments)))
  379. class RequestEncodingTest(WebTestCase):
  380. def get_handlers(self):
  381. return [("/group/(.*)", EchoHandler),
  382. ("/slashes/([^/]*)/([^/]*)", EchoHandler),
  383. ]
  384. def fetch_json(self, path):
  385. return json_decode(self.fetch(path).body)
  386. def test_group_question_mark(self):
  387. # Ensure that url-encoded question marks are handled properly
  388. self.assertEqual(self.fetch_json('/group/%3F'),
  389. dict(path='/group/%3F', path_args=['?'], args={}))
  390. self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
  391. dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
  392. def test_group_encoding(self):
  393. # Path components and query arguments should be decoded the same way
  394. self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
  395. {u"path": u"/group/%C3%A9",
  396. u"path_args": [u"\u00e9"],
  397. u"args": {u"arg": [u"\u00e9"]}})
  398. def test_slashes(self):
  399. # Slashes may be escaped to appear as a single "directory" in the path,
  400. # but they are then unescaped when passed to the get() method.
  401. self.assertEqual(self.fetch_json('/slashes/foo/bar'),
  402. dict(path="/slashes/foo/bar",
  403. path_args=["foo", "bar"],
  404. args={}))
  405. self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
  406. dict(path="/slashes/a%2Fb/c%2Fd",
  407. path_args=["a/b", "c/d"],
  408. args={}))
  409. def test_error(self):
  410. # Percent signs (encoded as %25) should not mess up printf-style
  411. # messages in logs
  412. with ExpectLog(gen_log, ".*Invalid unicode"):
  413. self.fetch("/group/?arg=%25%e9")
  414. class TypeCheckHandler(RequestHandler):
  415. def prepare(self):
  416. self.errors = {}
  417. self.check_type('status', self.get_status(), int)
  418. # get_argument is an exception from the general rule of using
  419. # type str for non-body data mainly for historical reasons.
  420. self.check_type('argument', self.get_argument('foo'), unicode_type)
  421. self.check_type('cookie_key', list(self.cookies.keys())[0], str)
  422. self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
  423. # Secure cookies return bytes because they can contain arbitrary
  424. # data, but regular cookies are native strings.
  425. if list(self.cookies.keys()) != ['asdf']:
  426. raise Exception("unexpected values for cookie keys: %r" %
  427. self.cookies.keys())
  428. self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes)
  429. self.check_type('get_cookie', self.get_cookie('asdf'), str)
  430. self.check_type('xsrf_token', self.xsrf_token, bytes)
  431. self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
  432. self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
  433. self.check_type('request_summary', self._request_summary(), str)
  434. def get(self, path_component):
  435. # path_component uses type unicode instead of str for consistency
  436. # with get_argument()
  437. self.check_type('path_component', path_component, unicode_type)
  438. self.write(self.errors)
  439. def post(self, path_component):
  440. self.check_type('path_component', path_component, unicode_type)
  441. self.write(self.errors)
  442. def check_type(self, name, obj, expected_type):
  443. actual_type = type(obj)
  444. if expected_type != actual_type:
  445. self.errors[name] = "expected %s, got %s" % (expected_type,
  446. actual_type)
  447. class DecodeArgHandler(RequestHandler):
  448. def decode_argument(self, value, name=None):
  449. if type(value) != bytes:
  450. raise Exception("unexpected type for value: %r" % type(value))
  451. # use self.request.arguments directly to avoid recursion
  452. if 'encoding' in self.request.arguments:
  453. return value.decode(to_unicode(self.request.arguments['encoding'][0]))
  454. else:
  455. return value
  456. def get(self, arg):
  457. def describe(s):
  458. if type(s) == bytes:
  459. return ["bytes", native_str(binascii.b2a_hex(s))]
  460. elif type(s) == unicode_type:
  461. return ["unicode", s]
  462. raise Exception("unknown type")
  463. self.write({'path': describe(arg),
  464. 'query': describe(self.get_argument("foo")),
  465. })
  466. class LinkifyHandler(RequestHandler):
  467. def get(self):
  468. self.render("linkify.html", message="http://example.com")
  469. class UIModuleResourceHandler(RequestHandler):
  470. def get(self):
  471. self.render("page.html", entries=[1, 2])
  472. class OptionalPathHandler(RequestHandler):
  473. def get(self, path):
  474. self.write({"path": path})
  475. class FlowControlHandler(RequestHandler):
  476. # These writes are too small to demonstrate real flow control,
  477. # but at least it shows that the callbacks get run.
  478. with ignore_deprecation():
  479. @asynchronous
  480. def get(self):
  481. self.write("1")
  482. with ignore_deprecation():
  483. self.flush(callback=self.step2)
  484. def step2(self):
  485. self.write("2")
  486. with ignore_deprecation():
  487. self.flush(callback=self.step3)
  488. def step3(self):
  489. self.write("3")
  490. self.finish()
  491. class MultiHeaderHandler(RequestHandler):
  492. def get(self):
  493. self.set_header("x-overwrite", "1")
  494. self.set_header("X-Overwrite", 2)
  495. self.add_header("x-multi", 3)
  496. self.add_header("X-Multi", "4")
  497. class RedirectHandler(RequestHandler):
  498. def get(self):
  499. if self.get_argument('permanent', None) is not None:
  500. self.redirect('/', permanent=int(self.get_argument('permanent')))
  501. elif self.get_argument('status', None) is not None:
  502. self.redirect('/', status=int(self.get_argument('status')))
  503. else:
  504. raise Exception("didn't get permanent or status arguments")
  505. class EmptyFlushCallbackHandler(RequestHandler):
  506. @gen.coroutine
  507. def get(self):
  508. # Ensure that the flush callback is run whether or not there
  509. # was any output. The gen.Task and direct yield forms are
  510. # equivalent.
  511. yield self.flush() # "empty" flush, but writes headers
  512. yield self.flush() # empty flush
  513. self.write("o")
  514. yield self.flush() # flushes the "o"
  515. yield self.flush() # empty flush
  516. self.finish("k")
  517. class HeaderInjectionHandler(RequestHandler):
  518. def get(self):
  519. try:
  520. self.set_header("X-Foo", "foo\r\nX-Bar: baz")
  521. raise Exception("Didn't get expected exception")
  522. except ValueError as e:
  523. if "Unsafe header value" in str(e):
  524. self.finish(b"ok")
  525. else:
  526. raise
  527. class GetArgumentHandler(RequestHandler):
  528. def prepare(self):
  529. if self.get_argument('source', None) == 'query':
  530. method = self.get_query_argument
  531. elif self.get_argument('source', None) == 'body':
  532. method = self.get_body_argument
  533. else:
  534. method = self.get_argument
  535. self.finish(method("foo", "default"))
  536. class GetArgumentsHandler(RequestHandler):
  537. def prepare(self):
  538. self.finish(dict(default=self.get_arguments("foo"),
  539. query=self.get_query_arguments("foo"),
  540. body=self.get_body_arguments("foo")))
  541. # This test is shared with wsgi_test.py
  542. @wsgi_safe
  543. class WSGISafeWebTest(WebTestCase):
  544. COOKIE_SECRET = "WebTest.COOKIE_SECRET"
  545. def get_app_kwargs(self):
  546. loader = DictLoader({
  547. "linkify.html": "{% module linkify(message) %}",
  548. "page.html": """\
  549. <html><head></head><body>
  550. {% for e in entries %}
  551. {% module Template("entry.html", entry=e) %}
  552. {% end %}
  553. </body></html>""",
  554. "entry.html": """\
  555. {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }",
  556. embedded_javascript="js_embed()",
  557. css_files=["/base.css", "/foo.css"],
  558. javascript_files="/common.js",
  559. html_head="<meta>",
  560. html_body='<script src="/analytics.js"/>') }}
  561. <div class="entry">...</div>""",
  562. })
  563. return dict(template_loader=loader,
  564. autoescape="xhtml_escape",
  565. cookie_secret=self.COOKIE_SECRET)
  566. def tearDown(self):
  567. super(WSGISafeWebTest, self).tearDown()
  568. RequestHandler._template_loaders.clear()
  569. def get_handlers(self):
  570. urls = [
  571. url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
  572. url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
  573. url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
  574. url("/linkify", LinkifyHandler),
  575. url("/uimodule_resources", UIModuleResourceHandler),
  576. url("/optional_path/(.+)?", OptionalPathHandler),
  577. url("/multi_header", MultiHeaderHandler),
  578. url("/redirect", RedirectHandler),
  579. url("/web_redirect_permanent", WebRedirectHandler, {"url": "/web_redirect_newpath"}),
  580. url("/web_redirect", WebRedirectHandler,
  581. {"url": "/web_redirect_newpath", "permanent": False}),
  582. url("//web_redirect_double_slash", WebRedirectHandler,
  583. {"url": '/web_redirect_newpath'}),
  584. url("/header_injection", HeaderInjectionHandler),
  585. url("/get_argument", GetArgumentHandler),
  586. url("/get_arguments", GetArgumentsHandler),
  587. ]
  588. return urls
  589. def fetch_json(self, *args, **kwargs):
  590. response = self.fetch(*args, **kwargs)
  591. response.rethrow()
  592. return json_decode(response.body)
  593. def test_types(self):
  594. cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
  595. "asdf", "qwer"))
  596. response = self.fetch("/typecheck/asdf?foo=bar",
  597. headers={"Cookie": "asdf=" + cookie_value})
  598. data = json_decode(response.body)
  599. self.assertEqual(data, {})
  600. response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
  601. headers={"Cookie": "asdf=" + cookie_value},
  602. body="foo=bar")
  603. def test_decode_argument(self):
  604. # These urls all decode to the same thing
  605. urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
  606. "/decode_arg/%E9?foo=%E9&encoding=latin1",
  607. "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
  608. ]
  609. for req_url in urls:
  610. response = self.fetch(req_url)
  611. response.rethrow()
  612. data = json_decode(response.body)
  613. self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'],
  614. u'query': [u'unicode', u'\u00e9'],
  615. })
  616. response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
  617. response.rethrow()
  618. data = json_decode(response.body)
  619. self.assertEqual(data, {u'path': [u'bytes', u'c3a9'],
  620. u'query': [u'bytes', u'c3a9'],
  621. })
  622. def test_decode_argument_invalid_unicode(self):
  623. # test that invalid unicode in URLs causes 400, not 500
  624. with ExpectLog(gen_log, ".*Invalid unicode.*"):
  625. response = self.fetch("/typecheck/invalid%FF")
  626. self.assertEqual(response.code, 400)
  627. response = self.fetch("/typecheck/invalid?foo=%FF")
  628. self.assertEqual(response.code, 400)
  629. def test_decode_argument_plus(self):
  630. # These urls are all equivalent.
  631. urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
  632. "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"]
  633. for req_url in urls:
  634. response = self.fetch(req_url)
  635. response.rethrow()
  636. data = json_decode(response.body)
  637. self.assertEqual(data, {u'path': [u'unicode', u'1 + 1'],
  638. u'query': [u'unicode', u'1 + 1'],
  639. })
  640. def test_reverse_url(self):
  641. self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
  642. '/decode_arg/foo')
  643. self.assertEqual(self.app.reverse_url('decode_arg', 42),
  644. '/decode_arg/42')
  645. self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
  646. '/decode_arg/%E9')
  647. self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'),
  648. '/decode_arg/%C3%A9')
  649. self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'),
  650. '/decode_arg/1%20%2B%201')
  651. def test_uimodule_unescaped(self):
  652. response = self.fetch("/linkify")
  653. self.assertEqual(response.body,
  654. b"<a href=\"http://example.com\">http://example.com</a>")
  655. def test_uimodule_resources(self):
  656. response = self.fetch("/uimodule_resources")
  657. self.assertEqual(response.body, b"""\
  658. <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
  659. <style type="text/css">
  660. .entry { margin-bottom: 1em; }
  661. </style>
  662. <meta>
  663. </head><body>
  664. <div class="entry">...</div>
  665. <div class="entry">...</div>
  666. <script src="/common.js" type="text/javascript"></script>
  667. <script type="text/javascript">
  668. //<![CDATA[
  669. js_embed()
  670. //]]>
  671. </script>
  672. <script src="/analytics.js"/>
  673. </body></html>""") # noqa: E501
  674. def test_optional_path(self):
  675. self.assertEqual(self.fetch_json("/optional_path/foo"),
  676. {u"path": u"foo"})
  677. self.assertEqual(self.fetch_json("/optional_path/"),
  678. {u"path": None})
  679. def test_multi_header(self):
  680. response = self.fetch("/multi_header")
  681. self.assertEqual(response.headers["x-overwrite"], "2")
  682. self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
  683. def test_redirect(self):
  684. response = self.fetch("/redirect?permanent=1", follow_redirects=False)
  685. self.assertEqual(response.code, 301)
  686. response = self.fetch("/redirect?permanent=0", follow_redirects=False)
  687. self.assertEqual(response.code, 302)
  688. response = self.fetch("/redirect?status=307", follow_redirects=False)
  689. self.assertEqual(response.code, 307)
  690. def test_web_redirect(self):
  691. response = self.fetch("/web_redirect_permanent", follow_redirects=False)
  692. self.assertEqual(response.code, 301)
  693. self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
  694. response = self.fetch("/web_redirect", follow_redirects=False)
  695. self.assertEqual(response.code, 302)
  696. self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
  697. def test_web_redirect_double_slash(self):
  698. response = self.fetch("//web_redirect_double_slash", follow_redirects=False)
  699. self.assertEqual(response.code, 301)
  700. self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
  701. def test_header_injection(self):
  702. response = self.fetch("/header_injection")
  703. self.assertEqual(response.body, b"ok")
  704. def test_get_argument(self):
  705. response = self.fetch("/get_argument?foo=bar")
  706. self.assertEqual(response.body, b"bar")
  707. response = self.fetch("/get_argument?foo=")
  708. self.assertEqual(response.body, b"")
  709. response = self.fetch("/get_argument")
  710. self.assertEqual(response.body, b"default")
  711. # Test merging of query and body arguments.
  712. # In singular form, body arguments take precedence over query arguments.
  713. body = urllib_parse.urlencode(dict(foo="hello"))
  714. response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
  715. self.assertEqual(response.body, b"hello")
  716. # In plural methods they are merged.
  717. response = self.fetch("/get_arguments?foo=bar",
  718. method="POST", body=body)
  719. self.assertEqual(json_decode(response.body),
  720. dict(default=['bar', 'hello'],
  721. query=['bar'],
  722. body=['hello']))
  723. def test_get_query_arguments(self):
  724. # send as a post so we can ensure the separation between query
  725. # string and body arguments.
  726. body = urllib_parse.urlencode(dict(foo="hello"))
  727. response = self.fetch("/get_argument?source=query&foo=bar",
  728. method="POST", body=body)
  729. self.assertEqual(response.body, b"bar")
  730. response = self.fetch("/get_argument?source=query&foo=",
  731. method="POST", body=body)
  732. self.assertEqual(response.body, b"")
  733. response = self.fetch("/get_argument?source=query",
  734. method="POST", body=body)
  735. self.assertEqual(response.body, b"default")
  736. def test_get_body_arguments(self):
  737. body = urllib_parse.urlencode(dict(foo="bar"))
  738. response = self.fetch("/get_argument?source=body&foo=hello",
  739. method="POST", body=body)
  740. self.assertEqual(response.body, b"bar")
  741. body = urllib_parse.urlencode(dict(foo=""))
  742. response = self.fetch("/get_argument?source=body&foo=hello",
  743. method="POST", body=body)
  744. self.assertEqual(response.body, b"")
  745. body = urllib_parse.urlencode(dict())
  746. response = self.fetch("/get_argument?source=body&foo=hello",
  747. method="POST", body=body)
  748. self.assertEqual(response.body, b"default")
  749. def test_no_gzip(self):
  750. response = self.fetch('/get_argument')
  751. self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
  752. self.assertNotIn('gzip', response.headers.get('Content-Encoding', ''))
  753. class NonWSGIWebTests(WebTestCase):
  754. def get_handlers(self):
  755. return [("/flow_control", FlowControlHandler),
  756. ("/empty_flush", EmptyFlushCallbackHandler),
  757. ]
  758. def test_flow_control(self):
  759. self.assertEqual(self.fetch("/flow_control").body, b"123")
  760. def test_empty_flush(self):
  761. response = self.fetch("/empty_flush")
  762. self.assertEqual(response.body, b"ok")
  763. @wsgi_safe
  764. class ErrorResponseTest(WebTestCase):
  765. def get_handlers(self):
  766. class DefaultHandler(RequestHandler):
  767. def get(self):
  768. if self.get_argument("status", None):
  769. raise HTTPError(int(self.get_argument("status")))
  770. 1 / 0
  771. class WriteErrorHandler(RequestHandler):
  772. def get(self):
  773. if self.get_argument("status", None):
  774. self.send_error(int(self.get_argument("status")))
  775. else:
  776. 1 / 0
  777. def write_error(self, status_code, **kwargs):
  778. self.set_header("Content-Type", "text/plain")
  779. if "exc_info" in kwargs:
  780. self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
  781. else:
  782. self.write("Status: %d" % status_code)
  783. class FailedWriteErrorHandler(RequestHandler):
  784. def get(self):
  785. 1 / 0
  786. def write_error(self, status_code, **kwargs):
  787. raise Exception("exception in write_error")
  788. return [url("/default", DefaultHandler),
  789. url("/write_error", WriteErrorHandler),
  790. url("/failed_write_error", FailedWriteErrorHandler),
  791. ]
  792. def test_default(self):
  793. with ExpectLog(app_log, "Uncaught exception"):
  794. response = self.fetch("/default")
  795. self.assertEqual(response.code, 500)
  796. self.assertTrue(b"500: Internal Server Error" in response.body)
  797. response = self.fetch("/default?status=503")
  798. self.assertEqual(response.code, 503)
  799. self.assertTrue(b"503: Service Unavailable" in response.body)
  800. response = self.fetch("/default?status=435")
  801. self.assertEqual(response.code, 435)
  802. self.assertTrue(b"435: Unknown" in response.body)
  803. def test_write_error(self):
  804. with ExpectLog(app_log, "Uncaught exception"):
  805. response = self.fetch("/write_error")
  806. self.assertEqual(response.code, 500)
  807. self.assertEqual(b"Exception: ZeroDivisionError", response.body)
  808. response = self.fetch("/write_error?status=503")
  809. self.assertEqual(response.code, 503)
  810. self.assertEqual(b"Status: 503", response.body)
  811. def test_failed_write_error(self):
  812. with ExpectLog(app_log, "Uncaught exception"):
  813. response = self.fetch("/failed_write_error")
  814. self.assertEqual(response.code, 500)
  815. self.assertEqual(b"", response.body)
  816. @wsgi_safe
  817. class StaticFileTest(WebTestCase):
  818. # The expected MD5 hash of robots.txt, used in tests that call
  819. # StaticFileHandler.get_version
  820. robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
  821. static_dir = os.path.join(os.path.dirname(__file__), 'static')
  822. def get_handlers(self):
  823. class StaticUrlHandler(RequestHandler):
  824. def get(self, path):
  825. with_v = int(self.get_argument('include_version', 1))
  826. self.write(self.static_url(path, include_version=with_v))
  827. class AbsoluteStaticUrlHandler(StaticUrlHandler):
  828. include_host = True
  829. class OverrideStaticUrlHandler(RequestHandler):
  830. def get(self, path):
  831. do_include = bool(self.get_argument("include_host"))
  832. self.include_host = not do_include
  833. regular_url = self.static_url(path)
  834. override_url = self.static_url(path, include_host=do_include)
  835. if override_url == regular_url:
  836. return self.write(str(False))
  837. protocol = self.request.protocol + "://"
  838. protocol_length = len(protocol)
  839. check_regular = regular_url.find(protocol, 0, protocol_length)
  840. check_override = override_url.find(protocol, 0, protocol_length)
  841. if do_include:
  842. result = (check_override == 0 and check_regular == -1)
  843. else:
  844. result = (check_override == -1 and check_regular == 0)
  845. self.write(str(result))
  846. return [('/static_url/(.*)', StaticUrlHandler),
  847. ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
  848. ('/override_static_url/(.*)', OverrideStaticUrlHandler),
  849. ('/root_static/(.*)', StaticFileHandler, dict(path='/'))]
  850. def get_app_kwargs(self):
  851. return dict(static_path=relpath('static'))
  852. def test_static_files(self):
  853. response = self.fetch('/robots.txt')
  854. self.assertTrue(b"Disallow: /" in response.body)
  855. response = self.fetch('/static/robots.txt')
  856. self.assertTrue(b"Disallow: /" in response.body)
  857. self.assertEqual(response.headers.get("Content-Type"), "text/plain")
  858. def test_static_compressed_files(self):
  859. response = self.fetch("/static/sample.xml.gz")
  860. self.assertEqual(response.headers.get("Content-Type"),
  861. "application/gzip")
  862. response = self.fetch("/static/sample.xml.bz2")
  863. self.assertEqual(response.headers.get("Content-Type"),
  864. "application/octet-stream")
  865. # make sure the uncompressed file still has the correct type
  866. response = self.fetch("/static/sample.xml")
  867. self.assertTrue(response.headers.get("Content-Type")
  868. in set(("text/xml", "application/xml")))
  869. def test_static_url(self):
  870. response = self.fetch("/static_url/robots.txt")
  871. self.assertEqual(response.body,
  872. b"/static/robots.txt?v=" + self.robots_txt_hash)
  873. def test_absolute_static_url(self):
  874. response = self.fetch("/abs_static_url/robots.txt")
  875. self.assertEqual(response.body, (
  876. utf8(self.get_url("/")) +
  877. b"static/robots.txt?v=" +
  878. self.robots_txt_hash
  879. ))
  880. def test_relative_version_exclusion(self):
  881. response = self.fetch("/static_url/robots.txt?include_version=0")
  882. self.assertEqual(response.body, b"/static/robots.txt")
  883. def test_absolute_version_exclusion(self):
  884. response = self.fetch("/abs_static_url/robots.txt?include_version=0")
  885. self.assertEqual(response.body,
  886. utf8(self.get_url("/") + "static/robots.txt"))
  887. def test_include_host_override(self):
  888. self._trigger_include_host_check(False)
  889. self._trigger_include_host_check(True)
  890. def _trigger_include_host_check(self, include_host):
  891. path = "/override_static_url/robots.txt?include_host=%s"
  892. response = self.fetch(path % int(include_host))
  893. self.assertEqual(response.body, utf8(str(True)))
  894. def get_and_head(self, *args, **kwargs):
  895. """Performs a GET and HEAD request and returns the GET response.
  896. Fails if any ``Content-*`` headers returned by the two requests
  897. differ.
  898. """
  899. head_response = self.fetch(*args, method="HEAD", **kwargs)
  900. get_response = self.fetch(*args, method="GET", **kwargs)
  901. content_headers = set()
  902. for h in itertools.chain(head_response.headers, get_response.headers):
  903. if h.startswith('Content-'):
  904. content_headers.add(h)
  905. for h in content_headers:
  906. self.assertEqual(head_response.headers.get(h),
  907. get_response.headers.get(h),
  908. "%s differs between GET (%s) and HEAD (%s)" %
  909. (h, head_response.headers.get(h),
  910. get_response.headers.get(h)))
  911. return get_response
  912. def test_static_304_if_modified_since(self):
  913. response1 = self.get_and_head("/static/robots.txt")
  914. response2 = self.get_and_head("/static/robots.txt", headers={
  915. 'If-Modified-Since': response1.headers['Last-Modified']})
  916. self.assertEqual(response2.code, 304)
  917. self.assertTrue('Content-Length' not in response2.headers)
  918. self.assertTrue('Last-Modified' not in response2.headers)
  919. def test_static_304_if_none_match(self):
  920. response1 = self.get_and_head("/static/robots.txt")
  921. response2 = self.get_and_head("/static/robots.txt", headers={
  922. 'If-None-Match': response1.headers['Etag']})
  923. self.assertEqual(response2.code, 304)
  924. def test_static_304_etag_modified_bug(self):
  925. response1 = self.get_and_head("/static/robots.txt")
  926. response2 = self.get_and_head("/static/robots.txt", headers={
  927. 'If-None-Match': '"MISMATCH"',
  928. 'If-Modified-Since': response1.headers['Last-Modified']})
  929. self.assertEqual(response2.code, 200)
  930. def test_static_if_modified_since_pre_epoch(self):
  931. # On windows, the functions that work with time_t do not accept
  932. # negative values, and at least one client (processing.js) seems
  933. # to use if-modified-since 1/1/1960 as a cache-busting technique.
  934. response = self.get_and_head("/static/robots.txt", headers={
  935. 'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'})
  936. self.assertEqual(response.code, 200)
  937. def test_static_if_modified_since_time_zone(self):
  938. # Instead of the value from Last-Modified, make requests with times
  939. # chosen just before and after the known modification time
  940. # of the file to ensure that the right time zone is being used
  941. # when parsing If-Modified-Since.
  942. stat = os.stat(relpath('static/robots.txt'))
  943. response = self.get_and_head('/static/robots.txt', headers={
  944. 'If-Modified-Since': format_timestamp(stat.st_mtime - 1)})
  945. self.assertEqual(response.code, 200)
  946. response = self.get_and_head('/static/robots.txt', headers={
  947. 'If-Modified-Since': format_timestamp(stat.st_mtime + 1)})
  948. self.assertEqual(response.code, 304)
  949. def test_static_etag(self):
  950. response = self.get_and_head('/static/robots.txt')
  951. self.assertEqual(utf8(response.headers.get("Etag")),
  952. b'"' + self.robots_txt_hash + b'"')
  953. def test_static_with_range(self):
  954. response = self.get_and_head('/static/robots.txt', headers={
  955. 'Range': 'bytes=0-9'})
  956. self.assertEqual(response.code, 206)
  957. self.assertEqual(response.body, b"User-agent")
  958. self.assertEqual(utf8(response.headers.get("Etag")),
  959. b'"' + self.robots_txt_hash + b'"')
  960. self.assertEqual(response.headers.get("Content-Length"), "10")
  961. self.assertEqual(response.headers.get("Content-Range"),
  962. "bytes 0-9/26")
  963. def test_static_with_range_full_file(self):
  964. response = self.get_and_head('/static/robots.txt', headers={
  965. 'Range': 'bytes=0-'})
  966. # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
  967. # to ``Range: bytes=0-`` :(
  968. self.assertEqual(response.code, 200)
  969. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  970. with open(robots_file_path) as f:
  971. self.assertEqual(response.body, utf8(f.read()))
  972. self.assertEqual(response.headers.get("Content-Length"), "26")
  973. self.assertEqual(response.headers.get("Content-Range"), None)
  974. def test_static_with_range_full_past_end(self):
  975. response = self.get_and_head('/static/robots.txt', headers={
  976. 'Range': 'bytes=0-10000000'})
  977. self.assertEqual(response.code, 200)
  978. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  979. with open(robots_file_path) as f:
  980. self.assertEqual(response.body, utf8(f.read()))
  981. self.assertEqual(response.headers.get("Content-Length"), "26")
  982. self.assertEqual(response.headers.get("Content-Range"), None)
  983. def test_static_with_range_partial_past_end(self):
  984. response = self.get_and_head('/static/robots.txt', headers={
  985. 'Range': 'bytes=1-10000000'})
  986. self.assertEqual(response.code, 206)
  987. robots_file_path = os.path.join(self.static_dir, "robots.txt")
  988. with open(robots_file_path) as f:
  989. self.assertEqual(response.body, utf8(f.read()[1:]))
  990. self.assertEqual(response.headers.get("Content-Length"), "25")
  991. self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
  992. def test_static_with_range_end_edge(self):
  993. response = self.get_and_head('/static/robots.txt', headers={
  994. 'Range': 'bytes=22-'})
  995. self.assertEqual(response.body, b": /\n")
  996. self.assertEqual(response.headers.get("Content-Length"), "4")
  997. self.assertEqual(response.headers.get("Content-Range"),
  998. "bytes 22-25/26")
  999. def test_static_with_range_neg_end(self):
  1000. response = self.get_and_head('/static/robots.txt', headers={
  1001. 'Range': 'bytes=-4'})
  1002. self.assertEqual(response.body, b": /\n")
  1003. self.assertEqual(response.headers.get("Content-Length"), "4")
  1004. self.assertEqual(response.headers.get("Content-Range"),
  1005. "bytes 22-25/26")
  1006. def test_static_invalid_range(self):
  1007. response = self.get_and_head('/static/robots.txt', headers={
  1008. 'Range': 'asdf'})
  1009. self.assertEqual(response.code, 200)
  1010. def test_static_unsatisfiable_range_zero_suffix(self):
  1011. response = self.get_and_head('/static/robots.txt', headers={
  1012. 'Range': 'bytes=-0'})
  1013. self.assertEqual(response.headers.get("Content-Range"),
  1014. "bytes */26")
  1015. self.assertEqual(response.code, 416)
  1016. def test_static_unsatisfiable_range_invalid_start(self):
  1017. response = self.get_and_head('/static/robots.txt', headers={
  1018. 'Range': 'bytes=26'})
  1019. self.assertEqual(response.code, 416)
  1020. self.assertEqual(response.headers.get("Content-Range"),
  1021. "bytes */26")
  1022. def test_static_head(self):
  1023. response = self.fetch('/static/robots.txt', method='HEAD')
  1024. self.assertEqual(response.code, 200)
  1025. # No body was returned, but we did get the right content length.
  1026. self.assertEqual(response.body, b'')
  1027. self.assertEqual(response.headers['Content-Length'], '26')
  1028. self.assertEqual(utf8(response.headers['Etag']),
  1029. b'"' + self.robots_txt_hash + b'"')
  1030. def test_static_head_range(self):
  1031. response = self.fetch('/static/robots.txt', method='HEAD',
  1032. headers={'Range': 'bytes=1-4'})
  1033. self.assertEqual(response.code, 206)
  1034. self.assertEqual(response.body, b'')
  1035. self.assertEqual(response.headers['Content-Length'], '4')
  1036. self.assertEqual(utf8(response.headers['Etag']),
  1037. b'"' + self.robots_txt_hash + b'"')
  1038. def test_static_range_if_none_match(self):
  1039. response = self.get_and_head('/static/robots.txt', headers={
  1040. 'Range': 'bytes=1-4',
  1041. 'If-None-Match': b'"' + self.robots_txt_hash + b'"'})
  1042. self.assertEqual(response.code, 304)
  1043. self.assertEqual(response.body, b'')
  1044. self.assertTrue('Content-Length' not in response.headers)
  1045. self.assertEqual(utf8(response.headers['Etag']),
  1046. b'"' + self.robots_txt_hash + b'"')
  1047. def test_static_404(self):
  1048. response = self.get_and_head('/static/blarg')
  1049. self.assertEqual(response.code, 404)
  1050. def test_path_traversal_protection(self):
  1051. # curl_httpclient processes ".." on the client side, so we
  1052. # must test this with simple_httpclient.
  1053. self.http_client.close()
  1054. self.http_client = SimpleAsyncHTTPClient()
  1055. with ExpectLog(gen_log, ".*not in root static directory"):
  1056. response = self.get_and_head('/static/../static_foo.txt')
  1057. # Attempted path traversal should result in 403, not 200
  1058. # (which means the check failed and the file was served)
  1059. # or 404 (which means that the file didn't exist and
  1060. # is probably a packaging error).
  1061. self.assertEqual(response.code, 403)
  1062. @unittest.skipIf(os.name != 'posix', 'non-posix OS')
  1063. def test_root_static_path(self):
  1064. # Sometimes people set the StaticFileHandler's path to '/'
  1065. # to disable Tornado's path validation (in conjunction with
  1066. # their own validation in get_absolute_path). Make sure
  1067. # that the stricter validation in 4.2.1 doesn't break them.
  1068. path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
  1069. 'static/robots.txt')
  1070. response = self.get_and_head('/root_static' + urllib_parse.quote(path))
  1071. self.assertEqual(response.code, 200)
  1072. @wsgi_safe
  1073. class StaticDefaultFilenameTest(WebTestCase):
  1074. def get_app_kwargs(self):
  1075. return dict(static_path=relpath('static'),
  1076. static_handler_args=dict(default_filename='index.html'))
  1077. def get_handlers(self):
  1078. return []
  1079. def test_static_default_filename(self):
  1080. response = self.fetch('/static/dir/', follow_redirects=False)
  1081. self.assertEqual(response.code, 200)
  1082. self.assertEqual(b'this is the index\n', response.body)
  1083. def test_static_default_redirect(self):
  1084. response = self.fetch('/static/dir', follow_redirects=False)
  1085. self.assertEqual(response.code, 301)
  1086. self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
  1087. @wsgi_safe
  1088. class StaticFileWithPathTest(WebTestCase):
  1089. def get_app_kwargs(self):
  1090. return dict(static_path=relpath('static'),
  1091. static_handler_args=dict(default_filename='index.html'))
  1092. def get_handlers(self):
  1093. return [("/foo/(.*)", StaticFileHandler, {
  1094. "path": relpath("templates/"),
  1095. })]
  1096. def test_serve(self):
  1097. response = self.fetch("/foo/utf8.html")
  1098. self.assertEqual(response.body, b"H\xc3\xa9llo\n")
  1099. @wsgi_safe
  1100. class CustomStaticFileTest(WebTestCase):
  1101. def get_handlers(self):
  1102. class MyStaticFileHandler(StaticFileHandler):
  1103. @classmethod
  1104. def make_static_url(cls, settings, path):
  1105. version_hash = cls.get_version(settings, path)
  1106. extension_index = path.rindex('.')
  1107. before_version = path[:extension_index]
  1108. after_version = path[(extension_index + 1):]
  1109. return '/static/%s.%s.%s' % (before_version, version_hash,
  1110. after_version)
  1111. def parse_url_path(self, url_path):
  1112. extension_index = url_path.rindex('.')
  1113. version_index = url_path.rindex('.', 0, extension_index)
  1114. return '%s%s' % (url_path[:version_index],
  1115. url_path[extension_index:])
  1116. @classmethod
  1117. def get_absolute_path(cls, settings, path):
  1118. return 'CustomStaticFileTest:' + path
  1119. def validate_absolute_path(self, root, absolute_path):
  1120. return absolute_path
  1121. @classmethod
  1122. def get_content(self, path, start=None, end=None):
  1123. assert start is None and end is None
  1124. if path == 'CustomStaticFileTest:foo.txt':
  1125. return b'bar'
  1126. raise Exception("unexpected path %r" % path)
  1127. def get_content_size(self):
  1128. if self.absolute_path == 'CustomStaticFileTest:foo.txt':
  1129. return 3
  1130. raise Exception("unexpected path %r" % self.absolute_path)
  1131. def get_modified_time(self):
  1132. return None
  1133. @classmethod
  1134. def get_version(cls, settings, path):
  1135. return "42"
  1136. class StaticUrlHandler(RequestHandler):
  1137. def get(self, path):
  1138. self.write(self.static_url(path))
  1139. self.static_handler_class = MyStaticFileHandler
  1140. return [("/static_url/(.*)", StaticUrlHandler)]
  1141. def get_app_kwargs(self):
  1142. return dict(static_path="dummy",
  1143. static_handler_class=self.static_handler_class)
  1144. def test_serve(self):
  1145. response = self.fetch("/static/foo.42.txt")
  1146. self.assertEqual(response.body, b"bar")
  1147. def test_static_url(self):
  1148. with ExpectLog(gen_log, "Could not open static file", required=False):
  1149. response = self.fetch("/static_url/foo.txt")
  1150. self.assertEqual(response.body, b"/static/foo.42.txt")
  1151. @wsgi_safe
  1152. class HostMatchingTest(WebTestCase):
  1153. class Handler(RequestHandler):
  1154. def initialize(self, reply):
  1155. self.reply = reply
  1156. def get(self):
  1157. self.write(self.reply)
  1158. def get_handlers(self):
  1159. return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
  1160. def test_host_matching(self):
  1161. self.app.add_handlers("www.example.com",
  1162. [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
  1163. self.app.add_handlers(r"www\.example\.com",
  1164. [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
  1165. self.app.add_handlers("www.example.com",
  1166. [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
  1167. self.app.add_handlers("www.e.*e.com",
  1168. [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})])
  1169. response = self.fetch("/foo")
  1170. self.assertEqual(response.body, b"wildcard")
  1171. response = self.fetch("/bar")
  1172. self.assertEqual(response.code, 404)
  1173. response = self.fetch("/baz")
  1174. self.assertEqual(response.code, 404)
  1175. response = self.fetch("/foo", headers={'Host': 'www.example.com'})
  1176. self.assertEqual(response.body, b"[0]")
  1177. response = self.fetch("/bar", headers={'Host': 'www.example.com'})
  1178. self.assertEqual(response.body, b"[1]")
  1179. response = self.fetch("/baz", headers={'Host': 'www.example.com'})
  1180. self.assertEqual(response.body, b"[2]")
  1181. response = self.fetch("/baz", headers={'Host': 'www.exe.com'})
  1182. self.assertEqual(response.body, b"[3]")
  1183. @wsgi_safe
  1184. class DefaultHostMatchingTest(WebTestCase):
  1185. def get_handlers(self):
  1186. return []
  1187. def get_app_kwargs(self):
  1188. return {'default_host': "www.example.com"}
  1189. def test_default_host_matching(self):
  1190. self.app.add_handlers("www.example.com",
  1191. [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
  1192. self.app.add_handlers(r"www\.example\.com",
  1193. [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
  1194. self.app.add_handlers("www.test.com",
  1195. [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
  1196. response = self.fetch("/foo")
  1197. self.assertEqual(response.body, b"[0]")
  1198. response = self.fetch("/bar")
  1199. self.assertEqual(response.body, b"[1]")
  1200. response = self.fetch("/baz")
  1201. self.assertEqual(response.code, 404)
  1202. response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"})
  1203. self.assertEqual(response.code, 404)
  1204. self.app.default_host = "www.test.com"
  1205. response = self.fetch("/baz")
  1206. self.assertEqual(response.body, b"[2]")
  1207. @wsgi_safe
  1208. class NamedURLSpecGroupsTest(WebTestCase):
  1209. def get_handlers(self):
  1210. class EchoHandler(RequestHandler):
  1211. def get(self, path):
  1212. self.write(path)
  1213. return [("/str/(?P<path>.*)", EchoHandler),
  1214. (u"/unicode/(?P<path>.*)", EchoHandler)]
  1215. def test_named_urlspec_groups(self):
  1216. response = self.fetch("/str/foo")
  1217. self.assertEqual(response.body, b"foo")
  1218. response = self.fetch("/unicode/bar")
  1219. self.assertEqual(response.body, b"bar")
  1220. @wsgi_safe
  1221. class ClearHeaderTest(SimpleHandlerTestCase):
  1222. class Handler(RequestHandler):
  1223. def get(self):
  1224. self.set_header("h1", "foo")
  1225. self.set_header("h2", "bar")
  1226. self.clear_header("h1")
  1227. self.clear_header("nonexistent")
  1228. def test_clear_header(self):
  1229. response = self.fetch("/")
  1230. self.assertTrue("h1" not in response.headers)
  1231. self.assertEqual(response.headers["h2"], "bar")
  1232. class Header204Test(SimpleHandlerTestCase):
  1233. class Handler(RequestHandler):
  1234. def get(self):
  1235. self.set_status(204)
  1236. self.finish()
  1237. def test_204_headers(self):
  1238. response = self.fetch('/')
  1239. self.assertEqual(response.code, 204)
  1240. self.assertNotIn("Content-Length", response.headers)
  1241. self.assertNotIn("Transfer-Encoding", response.headers)
  1242. @wsgi_safe
  1243. class Header304Test(SimpleHandlerTestCase):
  1244. class Handler(RequestHandler):
  1245. def get(self):
  1246. self.set_header("Content-Language", "en_US")
  1247. self.write("hello")
  1248. def test_304_headers(self):
  1249. response1 = self.fetch('/')
  1250. self.assertEqual(response1.headers["Content-Length"], "5")
  1251. self.assertEqual(response1.headers["Content-Language"], "en_US")
  1252. response2 = self.fetch('/', headers={
  1253. 'If-None-Match': response1.headers["Etag"]})
  1254. self.assertEqual(response2.code, 304)
  1255. self.assertTrue("Content-Length" not in response2.headers)
  1256. self.assertTrue("Content-Language" not in response2.headers)
  1257. # Not an entity header, but should not be added to 304s by chunking
  1258. self.assertTrue("Transfer-Encoding" not in response2.headers)
  1259. @wsgi_safe
  1260. class StatusReasonTest(SimpleHandlerTestCase):
  1261. class Handler(RequestHandler):
  1262. def get(self):
  1263. reason = self.request.arguments.get('reason', [])
  1264. self.set_status(int(self.get_argument('code')),
  1265. reason=reason[0] if reason else None)
  1266. def get_http_client(self):
  1267. # simple_httpclient only: curl doesn't expose the reason string
  1268. return SimpleAsyncHTTPClient()
  1269. def test_status(self):
  1270. response = self.fetch("/?code=304")
  1271. self.assertEqual(response.code, 304)
  1272. self.assertEqual(response.reason, "Not Modified")
  1273. response = self.fetch("/?code=304&reason=Foo")
  1274. self.assertEqual(response.code, 304)
  1275. self.assertEqual(response.reason, "Foo")
  1276. response = self.fetch("/?code=682&reason=Bar")
  1277. self.assertEqual(response.code, 682)
  1278. self.assertEqual(response.reason, "Bar")
  1279. response = self.fetch("/?code=682")
  1280. self.assertEqual(response.code, 682)
  1281. self.assertEqual(response.reason, "Unknown")
  1282. @wsgi_safe
  1283. class DateHeaderTest(SimpleHandlerTestCase):
  1284. class Handler(RequestHandler):
  1285. def get(self):
  1286. self.write("hello")
  1287. def test_date_header(self):
  1288. response = self.fetch('/')
  1289. header_date = datetime.datetime(
  1290. *email.utils.parsedate(response.headers['Date'])[:6])
  1291. self.assertTrue(header_date - datetime.datetime.utcnow() <
  1292. datetime.timedelta(seconds=2))
  1293. @wsgi_safe
  1294. class RaiseWithReasonTest(SimpleHandlerTestCase):
  1295. class Handler(RequestHandler):
  1296. def get(self):
  1297. raise HTTPError(682, reason="Foo")
  1298. def get_http_client(self):
  1299. # simple_httpclient only: curl doesn't expose the reason string
  1300. return SimpleAsyncHTTPClient()
  1301. def test_raise_with_reason(self):
  1302. response = self.fetch("/")
  1303. self.assertEqual(response.code, 682)
  1304. self.assertEqual(response.reason, "Foo")
  1305. self.assertIn(b'682: Foo', response.body)
  1306. def test_httperror_str(self):
  1307. self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
  1308. def test_httperror_str_from_httputil(self):
  1309. self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
  1310. @wsgi_safe
  1311. class ErrorHandlerXSRFTest(WebTestCase):
  1312. def get_handlers(self):
  1313. # note that if the handlers list is empty we get the default_host
  1314. # redirect fallback instead of a 404, so test with both an
  1315. # explicitly defined error handler and an implicit 404.
  1316. return [('/error', ErrorHandler, dict(status_code=417))]
  1317. def get_app_kwargs(self):
  1318. return dict(xsrf_cookies=True)
  1319. def test_error_xsrf(self):
  1320. response = self.fetch('/error', method='POST', body='')
  1321. self.assertEqual(response.code, 417)
  1322. def test_404_xsrf(self):
  1323. response = self.fetch('/404', method='POST', body='')
  1324. self.assertEqual(response.code, 404)
  1325. @wsgi_safe
  1326. class GzipTestCase(SimpleHandlerTestCase):
  1327. class Handler(RequestHandler):
  1328. def get(self):
  1329. for v in self.get_arguments('vary'):
  1330. self.add_header('Vary', v)
  1331. # Must write at least MIN_LENGTH bytes to activate compression.
  1332. self.write('hello world' + ('!' * GZipContentEncoding.MIN_LENGTH))
  1333. def get_app_kwargs(self):
  1334. return dict(
  1335. gzip=True,
  1336. static_path=os.path.join(os.path.dirname(__file__), 'static'))
  1337. def assert_compressed(self, response):
  1338. # simple_httpclient renames the content-encoding header;
  1339. # curl_httpclient doesn't.
  1340. self.assertEqual(
  1341. response.headers.get(
  1342. 'Content-Encoding',
  1343. response.headers.get('X-Consumed-Content-Encoding')),
  1344. 'gzip')
  1345. def test_gzip(self):
  1346. response = self.fetch('/')
  1347. self.assert_compressed(response)
  1348. self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
  1349. def test_gzip_static(self):
  1350. # The streaming responses in StaticFileHandler have subtle
  1351. # interactions with the gzip output so test this case separately.
  1352. response = self.fetch('/robots.txt')
  1353. self.assert_compressed(response)
  1354. self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
  1355. def test_gzip_not_requested(self):
  1356. response = self.fetch('/', use_gzip=False)
  1357. self.assertNotIn('Content-Encoding', response.headers)
  1358. self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
  1359. def test_vary_already_present(self):
  1360. response = self.fetch('/?vary=Accept-Language')
  1361. self.assert_compressed(response)
  1362. self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
  1363. ['Accept-Language', 'Accept-Encoding'])
  1364. def test_vary_already_present_multiple(self):
  1365. # Regression test for https://github.com/tornadoweb/tornado/issues/1670
  1366. response = self.fetch('/?vary=Accept-Language&vary=Cookie')
  1367. self.assert_compressed(response)
  1368. self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
  1369. ['Accept-Language', 'Cookie', 'Accept-Encoding'])
  1370. @wsgi_safe
  1371. class PathArgsInPrepareTest(WebTestCase):
  1372. class Handler(RequestHandler):
  1373. def prepare(self):
  1374. self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
  1375. def get(self, path):
  1376. assert path == 'foo'
  1377. self.finish()
  1378. def get_handlers(self):
  1379. return [('/pos/(.*)', self.Handler),
  1380. ('/kw/(?P<path>.*)', self.Handler)]
  1381. def test_pos(self):
  1382. response = self.fetch('/pos/foo')
  1383. response.rethrow()
  1384. data = json_decode(response.body)
  1385. self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
  1386. def test_kw(self):
  1387. response = self.fetch('/kw/foo')
  1388. response.rethrow()
  1389. data = json_decode(response.body)
  1390. self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
  1391. @wsgi_safe
  1392. class ClearAllCookiesTest(SimpleHandlerTestCase):
  1393. class Handler(RequestHandler):
  1394. def get(self):
  1395. self.clear_all_cookies()
  1396. self.write('ok')
  1397. def test_clear_all_cookies(self):
  1398. response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
  1399. set_cookies = sorted(response.headers.get_list('Set-Cookie'))
  1400. # Python 3.5 sends 'baz="";'; older versions use 'baz=;'
  1401. self.assertTrue(set_cookies[0].startswith('baz=;') or
  1402. set_cookies[0].startswith('baz="";'))
  1403. self.assertTrue(set_cookies[1].startswith('foo=;') or
  1404. set_cookies[1].startswith('foo="";'))
  1405. class PermissionError(Exception):
  1406. pass
  1407. @wsgi_safe
  1408. class ExceptionHandlerTest(SimpleHandlerTestCase):
  1409. class Handler(RequestHandler):
  1410. def get(self):
  1411. exc = self.get_argument('exc')
  1412. if exc == 'http':
  1413. raise HTTPError(410, "no longer here")
  1414. elif exc == 'zero':
  1415. 1 / 0
  1416. elif exc == 'permission':
  1417. raise PermissionError('not allowed')
  1418. def write_error(self, status_code, **kwargs):
  1419. if 'exc_info' in kwargs:
  1420. typ, value, tb = kwargs['exc_info']
  1421. if isinstance(value, PermissionError):
  1422. self.set_status(403)
  1423. self.write('PermissionError')
  1424. return
  1425. RequestHandler.write_error(self, status_code, **kwargs)
  1426. def log_exception(self, typ, value, tb):
  1427. if isinstance(value, PermissionError):
  1428. app_log.warning('custom logging for PermissionError: %s',
  1429. value.args[0])
  1430. else:
  1431. RequestHandler.log_exception(self, typ, value, tb)
  1432. def test_http_error(self):
  1433. # HTTPErrors are logged as warnings with no stack trace.
  1434. # TODO: extend ExpectLog to test this more precisely
  1435. with ExpectLog(gen_log, '.*no longer here'):
  1436. response = self.fetch('/?exc=http')
  1437. self.assertEqual(response.code, 410)
  1438. def test_unknown_error(self):
  1439. # Unknown errors are logged as errors with a stack trace.
  1440. with ExpectLog(app_log, 'Uncaught exception'):
  1441. response = self.fetch('/?exc=zero')
  1442. self.assertEqual(response.code, 500)
  1443. def test_known_error(self):
  1444. # log_exception can override logging behavior, and write_error
  1445. # can override the response.
  1446. with ExpectLog(app_log,
  1447. 'custom logging for PermissionError: not allowed'):
  1448. response = self.fetch('/?exc=permission')
  1449. self.assertEqual(response.code, 403)
  1450. @wsgi_safe
  1451. class BuggyLoggingTest(SimpleHandlerTestCase):
  1452. class Handler(RequestHandler):
  1453. def get(self):
  1454. 1 / 0
  1455. def log_exception(self, typ, value, tb):
  1456. 1 / 0
  1457. def test_buggy_log_exception(self):
  1458. # Something gets logged even though the application's
  1459. # logger is broken.
  1460. with ExpectLog(app_log, '.*'):
  1461. self.fetch('/')
  1462. @wsgi_safe
  1463. class UIMethodUIModuleTest(SimpleHandlerTestCase):
  1464. """Test that UI methods and modules are created correctly and
  1465. associated with the handler.
  1466. """
  1467. class Handler(RequestHandler):
  1468. def get(self):
  1469. self.render('foo.html')
  1470. def value(self):
  1471. return self.get_argument("value")
  1472. def get_app_kwargs(self):
  1473. def my_ui_method(handler, x):
  1474. return "In my_ui_method(%s) with handler value %s." % (
  1475. x, handler.value())
  1476. class MyModule(UIModule):
  1477. def render(self, x):
  1478. return "In MyModule(%s) with handler value %s." % (
  1479. x, self.handler.value())
  1480. loader = DictLoader({
  1481. 'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}',
  1482. })
  1483. return dict(template_loader=loader,
  1484. ui_methods={'my_ui_method': my_ui_method},
  1485. ui_modules={'MyModule': MyModule})
  1486. def tearDown(self):
  1487. super(UIMethodUIModuleTest, self).tearDown()
  1488. # TODO: fix template loader caching so this isn't necessary.
  1489. RequestHandler._template_loaders.clear()
  1490. def test_ui_method(self):
  1491. response = self.fetch('/?value=asdf')
  1492. self.assertEqual(response.body,
  1493. b'In my_ui_method(42) with handler value asdf. '
  1494. b'In MyModule(123) with handler value asdf.')
  1495. @wsgi_safe
  1496. class GetArgumentErrorTest(SimpleHandlerTestCase):
  1497. class Handler(RequestHandler):
  1498. def get(self):
  1499. try:
  1500. self.get_argument('foo')
  1501. self.write({})
  1502. except MissingArgumentError as e:
  1503. self.write({'arg_name': e.arg_name,
  1504. 'log_message': e.log_message})
  1505. def test_catch_error(self):
  1506. response = self.fetch('/')
  1507. self.assertEqual(json_decode(response.body),
  1508. {'arg_name': 'foo',
  1509. 'log_message': 'Missing argument foo'})
  1510. class MultipleExceptionTest(SimpleHandlerTestCase):
  1511. class Handler(RequestHandler):
  1512. exc_count = 0
  1513. with ignore_deprecation():
  1514. @asynchronous
  1515. def get(self):
  1516. IOLoop.current().add_callback(lambda: 1 / 0)
  1517. IOLoop.current().add_callback(lambda: 1 / 0)
  1518. def log_exception(self, typ, value, tb):
  1519. MultipleExceptionTest.Handler.exc_count += 1
  1520. def test_multi_exception(self):
  1521. with ignore_deprecation():
  1522. # This test verifies that multiple exceptions raised into the same
  1523. # ExceptionStackContext do not generate extraneous log entries
  1524. # due to "Cannot send error response after headers written".
  1525. # log_exception is called, but it does not proceed to send_error.
  1526. response = self.fetch('/')
  1527. self.assertEqual(response.code, 500)
  1528. response = self.fetch('/')
  1529. self.assertEqual(response.code, 500)
  1530. # Each of our two requests generated two exceptions, we should have
  1531. # seen at least three of them by now (the fourth may still be
  1532. # in the queue).
  1533. self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2)
  1534. @wsgi_safe
  1535. class SetLazyPropertiesTest(SimpleHandlerTestCase):
  1536. class Handler(RequestHandler):
  1537. def prepare(self):
  1538. self.current_user = 'Ben'
  1539. self.locale = locale.get('en_US')
  1540. def get_user_locale(self):
  1541. raise NotImplementedError()
  1542. def get_current_user(self):
  1543. raise NotImplementedError()
  1544. def get(self):
  1545. self.write('Hello %s (%s)' % (self.current_user, self.locale.code))
  1546. def test_set_properties(self):
  1547. # Ensure that current_user can be assigned to normally for apps
  1548. # that want to forgo the lazy get_current_user property
  1549. response = self.fetch('/')
  1550. self.assertEqual(response.body, b'Hello Ben (en_US)')
  1551. @wsgi_safe
  1552. class GetCurrentUserTest(WebTestCase):
  1553. def get_app_kwargs(self):
  1554. class WithoutUserModule(UIModule):
  1555. def render(self):
  1556. return ''
  1557. class WithUserModule(UIModule):
  1558. def render(self):
  1559. return str(self.current_user)
  1560. loader = DictLoader({
  1561. 'without_user.html': '',
  1562. 'with_user.html': '{{ current_user }}',
  1563. 'without_user_module.html': '{% module WithoutUserModule() %}',
  1564. 'with_user_module.html': '{% module WithUserModule() %}',
  1565. })
  1566. return dict(template_loader=loader,
  1567. ui_modules={'WithUserModule': WithUserModule,
  1568. 'WithoutUserModule': WithoutUserModule})
  1569. def tearDown(self):
  1570. super(GetCurrentUserTest, self).tearDown()
  1571. RequestHandler._template_loaders.clear()
  1572. def get_handlers(self):
  1573. class CurrentUserHandler(RequestHandler):
  1574. def prepare(self):
  1575. self.has_loaded_current_user = False
  1576. def get_current_user(self):
  1577. self.has_loaded_current_user = True
  1578. return ''
  1579. class WithoutUserHandler(CurrentUserHandler):
  1580. def get(self):
  1581. self.render_string('without_user.html')
  1582. self.finish(str(self.has_loaded_current_user))
  1583. class WithUserHandler(CurrentUserHandler):
  1584. def get(self):
  1585. self.render_string('with_user.html')
  1586. self.finish(str(self.has_loaded_current_user))
  1587. class CurrentUserModuleHandler(CurrentUserHandler):
  1588. def get_template_namespace(self):
  1589. # If RequestHandler.get_template_namespace is called, then
  1590. # get_current_user is evaluated. Until #820 is fixed, this
  1591. # is a small hack to circumvent the issue.
  1592. return self.ui
  1593. class WithoutUserModuleHandler(CurrentUserModuleHandler):
  1594. def get(self):
  1595. self.render_string('without_user_module.html')
  1596. self.finish(str(self.has_loaded_current_user))
  1597. class WithUserModuleHandler(CurrentUserModuleHandler):
  1598. def get(self):
  1599. self.render_string('with_user_module.html')
  1600. self.finish(str(self.has_loaded_current_user))
  1601. return [('/without_user', WithoutUserHandler),
  1602. ('/with_user', WithUserHandler),
  1603. ('/without_user_module', WithoutUserModuleHandler),
  1604. ('/with_user_module', WithUserModuleHandler)]
  1605. @unittest.skip('needs fix')
  1606. def test_get_current_user_is_lazy(self):
  1607. # TODO: Make this test pass. See #820.
  1608. response = self.fetch('/without_user')
  1609. self.assertEqual(response.body, b'False')
  1610. def test_get_current_user_works(self):
  1611. response = self.fetch('/with_user')
  1612. self.assertEqual(response.body, b'True')
  1613. def test_get_current_user_from_ui_module_is_lazy(self):
  1614. response = self.fetch('/without_user_module')
  1615. self.assertEqual(response.body, b'False')
  1616. def test_get_current_user_from_ui_module_works(self):
  1617. response = self.fetch('/with_user_module')
  1618. self.assertEqual(response.body, b'True')
  1619. @wsgi_safe
  1620. class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
  1621. class Handler(RequestHandler):
  1622. pass
  1623. def test_unimplemented_standard_methods(self):
  1624. for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']:
  1625. response = self.fetch('/', method=method)
  1626. self.assertEqual(response.code, 405)
  1627. for method in ['POST', 'PUT']:
  1628. response = self.fetch('/', method=method, body=b'')
  1629. self.assertEqual(response.code, 405)
  1630. class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
  1631. # wsgiref.validate complains about unknown methods in a way that makes
  1632. # this test not wsgi_safe.
  1633. class Handler(RequestHandler):
  1634. def other(self):
  1635. # Even though this method exists, it won't get called automatically
  1636. # because it is not in SUPPORTED_METHODS.
  1637. self.write('other')
  1638. def test_unimplemented_patch(self):
  1639. # PATCH is recently standardized; Tornado supports it by default
  1640. # but wsgiref.validate doesn't like it.
  1641. response = self.fetch('/', method='PATCH', body=b'')
  1642. self.assertEqual(response.code, 405)
  1643. def test_unimplemented_other(self):
  1644. response = self.fetch('/', method='OTHER',
  1645. allow_nonstandard_methods=True)
  1646. self.assertEqual(response.code, 405)
  1647. @wsgi_safe
  1648. class AllHTTPMethodsTest(SimpleHandlerTestCase):
  1649. class Handler(RequestHandler):
  1650. def method(self):
  1651. self.write(self.request.method)
  1652. get = delete = options = post = put = method
  1653. def test_standard_methods(self):
  1654. response = self.fetch('/', method='HEAD')
  1655. self.assertEqual(response.body, b'')
  1656. for method in ['GET', 'DELETE', 'OPTIONS']:
  1657. response = self.fetch('/', method=method)
  1658. self.assertEqual(response.body, utf8(method))
  1659. for method in ['POST', 'PUT']:
  1660. response = self.fetch('/', method=method, body=b'')
  1661. self.assertEqual(response.body, utf8(method))
  1662. class PatchMethodTest(SimpleHandlerTestCase):
  1663. class Handler(RequestHandler):
  1664. SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',)
  1665. def patch(self):
  1666. self.write('patch')
  1667. def other(self):
  1668. self.write('other')
  1669. def test_patch(self):
  1670. response = self.fetch('/', method='PATCH', body=b'')
  1671. self.assertEqual(response.body, b'patch')
  1672. def test_other(self):
  1673. response = self.fetch('/', method='OTHER',
  1674. allow_nonstandard_methods=True)
  1675. self.assertEqual(response.body, b'other')
  1676. @wsgi_safe
  1677. class FinishInPrepareTest(SimpleHandlerTestCase):
  1678. class Handler(RequestHandler):
  1679. def prepare(self):
  1680. self.finish('done')
  1681. def get(self):
  1682. # It's difficult to assert for certain that a method did not
  1683. # or will not be called in an asynchronous context, but this
  1684. # will be logged noisily if it is reached.
  1685. raise Exception('should not reach this method')
  1686. def test_finish_in_prepare(self):
  1687. response = self.fetch('/')
  1688. self.assertEqual(response.body, b'done')
  1689. @wsgi_safe
  1690. class Default404Test(WebTestCase):
  1691. def get_handlers(self):
  1692. # If there are no handlers at all a default redirect handler gets added.
  1693. return [('/foo', RequestHandler)]
  1694. def test_404(self):
  1695. response = self.fetch('/')
  1696. self.assertEqual(response.code, 404)
  1697. self.assertEqual(response.body,
  1698. b'<html><title>404: Not Found</title>'
  1699. b'<body>404: Not Found</body></html>')
  1700. @wsgi_safe
  1701. class Custom404Test(WebTestCase):
  1702. def get_handlers(self):
  1703. return [('/foo', RequestHandler)]
  1704. def get_app_kwargs(self):
  1705. class Custom404Handler(RequestHandler):
  1706. def get(self):
  1707. self.set_status(404)
  1708. self.write('custom 404 response')
  1709. return dict(default_handler_class=Custom404Handler)
  1710. def test_404(self):
  1711. response = self.fetch('/')
  1712. self.assertEqual(response.code, 404)
  1713. self.assertEqual(response.body, b'custom 404 response')
  1714. @wsgi_safe
  1715. class DefaultHandlerArgumentsTest(WebTestCase):
  1716. def get_handlers(self):
  1717. return [('/foo', RequestHandler)]
  1718. def get_app_kwargs(self):
  1719. return dict(default_handler_class=ErrorHandler,
  1720. default_handler_args=dict(status_code=403))
  1721. def test_403(self):
  1722. response = self.fetch('/')
  1723. self.assertEqual(response.code, 403)
  1724. @wsgi_safe
  1725. class HandlerByNameTest(WebTestCase):
  1726. def get_handlers(self):
  1727. # All three are equivalent.
  1728. return [('/hello1', HelloHandler),
  1729. ('/hello2', 'tornado.test.web_test.HelloHandler'),
  1730. url('/hello3', 'tornado.test.web_test.HelloHandler'),
  1731. ]
  1732. def test_handler_by_name(self):
  1733. resp = self.fetch('/hello1')
  1734. self.assertEqual(resp.body, b'hello')
  1735. resp = self.fetch('/hello2')
  1736. self.assertEqual(resp.body, b'hello')
  1737. resp = self.fetch('/hello3')
  1738. self.assertEqual(resp.body, b'hello')
  1739. class StreamingRequestBodyTest(WebTestCase):
  1740. def get_handlers(self):
  1741. @stream_request_body
  1742. class StreamingBodyHandler(RequestHandler):
  1743. def initialize(self, test):
  1744. self.test = test
  1745. def prepare(self):
  1746. self.test.prepared.set_result(None)
  1747. def data_received(self, data):
  1748. self.test.data.set_result(data)
  1749. def get(self):
  1750. self.test.finished.set_result(None)
  1751. self.write({})
  1752. @stream_request_body
  1753. class EarlyReturnHandler(RequestHandler):
  1754. def prepare(self):
  1755. # If we finish the response in prepare, it won't continue to
  1756. # the (non-existent) data_received.
  1757. raise HTTPError(401)
  1758. @stream_request_body
  1759. class CloseDetectionHandler(RequestHandler):
  1760. def initialize(self, test):
  1761. self.test = test
  1762. def on_connection_close(self):
  1763. super(CloseDetectionHandler, self).on_connection_close()
  1764. self.test.close_future.set_result(None)
  1765. return [('/stream_body', StreamingBodyHandler, dict(test=self)),
  1766. ('/early_return', EarlyReturnHandler),
  1767. ('/close_detection', CloseDetectionHandler, dict(test=self))]
  1768. def connect(self, url, connection_close):
  1769. # Use a raw connection so we can control the sending of data.
  1770. s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
  1771. s.connect(("127.0.0.1", self.get_http_port()))
  1772. stream = IOStream(s)
  1773. stream.write(b"GET " + url + b" HTTP/1.1\r\n")
  1774. if connection_close:
  1775. stream.write(b"Connection: close\r\n")
  1776. stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
  1777. return stream
  1778. @gen_test
  1779. def test_streaming_body(self):
  1780. self.prepared = Future()
  1781. self.data = Future()
  1782. self.finished = Future()
  1783. stream = self.connect(b"/stream_body", connection_close=True)
  1784. yield self.prepared
  1785. stream.write(b"4\r\nasdf\r\n")
  1786. # Ensure the first chunk is received before we send the second.
  1787. data = yield self.data
  1788. self.assertEqual(data, b"asdf")
  1789. self.data = Future()
  1790. stream.write(b"4\r\nqwer\r\n")
  1791. data = yield self.data
  1792. self.assertEquals(data, b"qwer")
  1793. stream.write(b"0\r\n\r\n")
  1794. yield self.finished
  1795. data = yield stream.read_until_close()
  1796. # This would ideally use an HTTP1Connection to read the response.
  1797. self.assertTrue(data.endswith(b"{}"))
  1798. stream.close()
  1799. @gen_test
  1800. def test_early_return(self):
  1801. stream = self.connect(b"/early_return", connection_close=False)
  1802. data = yield stream.read_until_close()
  1803. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  1804. @gen_test
  1805. def test_early_return_with_data(self):
  1806. stream = self.connect(b"/early_return", connection_close=False)
  1807. stream.write(b"4\r\nasdf\r\n")
  1808. data = yield stream.read_until_close()
  1809. self.assertTrue(data.startswith(b"HTTP/1.1 401"))
  1810. @gen_test
  1811. def test_close_during_upload(self):
  1812. self.close_future = Future()
  1813. stream = self.connect(b"/close_detection", connection_close=False)
  1814. stream.close()
  1815. yield self.close_future
  1816. # Each method in this handler returns a yieldable object and yields to the
  1817. # IOLoop so the future is not immediately ready. Ensure that the
  1818. # yieldables are respected and no method is called before the previous
  1819. # one has completed.
  1820. @stream_request_body
  1821. class BaseFlowControlHandler(RequestHandler):
  1822. def initialize(self, test):
  1823. self.test = test
  1824. self.method = None
  1825. self.methods = []
  1826. @contextlib.contextmanager
  1827. def in_method(self, method):
  1828. if self.method is not None:
  1829. self.test.fail("entered method %s while in %s" %
  1830. (method, self.method))
  1831. self.method = method
  1832. self.methods.append(method)
  1833. try:
  1834. yield
  1835. finally:
  1836. self.method = None
  1837. @gen.coroutine
  1838. def prepare(self):
  1839. # Note that asynchronous prepare() does not block data_received,
  1840. # so we don't use in_method here.
  1841. self.methods.append('prepare')
  1842. yield gen.moment
  1843. @gen.coroutine
  1844. def post(self):
  1845. with self.in_method('post'):
  1846. yield gen.moment
  1847. self.write(dict(methods=self.methods))
  1848. class BaseStreamingRequestFlowControlTest(object):
  1849. def get_httpserver_options(self):
  1850. # Use a small chunk size so flow control is relevant even though
  1851. # all the data arrives at once.
  1852. return dict(chunk_size=10, decompress_request=True)
  1853. def get_http_client(self):
  1854. # simple_httpclient only: curl doesn't support body_producer.
  1855. return SimpleAsyncHTTPClient()
  1856. # Test all the slightly different code paths for fixed, chunked, etc bodies.
  1857. def test_flow_control_fixed_body(self):
  1858. response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
  1859. method='POST')
  1860. response.rethrow()
  1861. self.assertEqual(json_decode(response.body),
  1862. dict(methods=['prepare', 'data_received',
  1863. 'data_received', 'data_received',
  1864. 'post']))
  1865. def test_flow_control_chunked_body(self):
  1866. chunks = [b'abcd', b'efgh', b'ijkl']
  1867. @gen.coroutine
  1868. def body_producer(write):
  1869. for i in chunks:
  1870. yield write(i)
  1871. response = self.fetch('/', body_producer=body_producer, method='POST')
  1872. response.rethrow()
  1873. self.assertEqual(json_decode(response.body),
  1874. dict(methods=['prepare', 'data_received',
  1875. 'data_received', 'data_received',
  1876. 'post']))
  1877. def test_flow_control_compressed_body(self):
  1878. bytesio = BytesIO()
  1879. gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
  1880. gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
  1881. gzip_file.close()
  1882. compressed_body = bytesio.getvalue()
  1883. response = self.fetch('/', body=compressed_body, method='POST',
  1884. headers={'Content-Encoding': 'gzip'})
  1885. response.rethrow()
  1886. self.assertEqual(json_decode(response.body),
  1887. dict(methods=['prepare', 'data_received',
  1888. 'data_received', 'data_received',
  1889. 'post']))
  1890. class DecoratedStreamingRequestFlowControlTest(
  1891. BaseStreamingRequestFlowControlTest,
  1892. WebTestCase):
  1893. def get_handlers(self):
  1894. class DecoratedFlowControlHandler(BaseFlowControlHandler):
  1895. @gen.coroutine
  1896. def data_received(self, data):
  1897. with self.in_method('data_received'):
  1898. yield gen.moment
  1899. return [('/', DecoratedFlowControlHandler, dict(test=self))]
  1900. @skipBefore35
  1901. class NativeStreamingRequestFlowControlTest(
  1902. BaseStreamingRequestFlowControlTest,
  1903. WebTestCase):
  1904. def get_handlers(self):
  1905. class NativeFlowControlHandler(BaseFlowControlHandler):
  1906. data_received = exec_test(globals(), locals(), """
  1907. async def data_received(self, data):
  1908. with self.in_method('data_received'):
  1909. import asyncio
  1910. await asyncio.sleep(0)
  1911. """)["data_received"]
  1912. return [('/', NativeFlowControlHandler, dict(test=self))]
  1913. @wsgi_safe
  1914. class IncorrectContentLengthTest(SimpleHandlerTestCase):
  1915. def get_handlers(self):
  1916. test = self
  1917. self.server_error = None
  1918. # Manually set a content-length that doesn't match the actual content.
  1919. class TooHigh(RequestHandler):
  1920. def get(self):
  1921. self.set_header("Content-Length", "42")
  1922. try:
  1923. self.finish("ok")
  1924. except Exception as e:
  1925. test.server_error = e
  1926. raise
  1927. class TooLow(RequestHandler):
  1928. def get(self):
  1929. self.set_header("Content-Length", "2")
  1930. try:
  1931. self.finish("hello")
  1932. except Exception as e:
  1933. test.server_error = e
  1934. raise
  1935. return [('/high', TooHigh),
  1936. ('/low', TooLow)]
  1937. def test_content_length_too_high(self):
  1938. # When the content-length is too high, the connection is simply
  1939. # closed without completing the response. An error is logged on
  1940. # the server.
  1941. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  1942. with ExpectLog(gen_log,
  1943. "(Cannot send error response after headers written"
  1944. "|Failed to flush partial response)"):
  1945. with self.assertRaises(HTTPClientError):
  1946. self.fetch("/high", raise_error=True)
  1947. self.assertEqual(str(self.server_error),
  1948. "Tried to write 40 bytes less than Content-Length")
  1949. def test_content_length_too_low(self):
  1950. # When the content-length is too low, the connection is closed
  1951. # without writing the last chunk, so the client never sees the request
  1952. # complete (which would be a framing error).
  1953. with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
  1954. with ExpectLog(gen_log,
  1955. "(Cannot send error response after headers written"
  1956. "|Failed to flush partial response)"):
  1957. with self.assertRaises(HTTPClientError):
  1958. self.fetch("/low", raise_error=True)
  1959. self.assertEqual(str(self.server_error),
  1960. "Tried to write more data than Content-Length")
  1961. class ClientCloseTest(SimpleHandlerTestCase):
  1962. class Handler(RequestHandler):
  1963. def get(self):
  1964. if self.request.version.startswith('HTTP/1'):
  1965. # Simulate a connection closed by the client during
  1966. # request processing. The client will see an error, but the
  1967. # server should respond gracefully (without logging errors
  1968. # because we were unable to write out as many bytes as
  1969. # Content-Length said we would)
  1970. self.request.connection.stream.close()
  1971. self.write('hello')
  1972. else:
  1973. # TODO: add a HTTP2-compatible version of this test.
  1974. self.write('requires HTTP/1.x')
  1975. def test_client_close(self):
  1976. with self.assertRaises((HTTPClientError, unittest.SkipTest)):
  1977. response = self.fetch('/', raise_error=True)
  1978. if response.body == b'requires HTTP/1.x':
  1979. self.skipTest('requires HTTP/1.x')
  1980. self.assertEqual(response.code, 599)
  1981. class SignedValueTest(unittest.TestCase):
  1982. SECRET = "It's a secret to everybody"
  1983. SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
  1984. def past(self):
  1985. return self.present() - 86400 * 32
  1986. def present(self):
  1987. return 1300000000
  1988. def test_known_values(self):
  1989. signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
  1990. version=1, clock=self.present)
  1991. self.assertEqual(
  1992. signed_v1,
  1993. b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
  1994. signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
  1995. version=2, clock=self.present)
  1996. self.assertEqual(
  1997. signed_v2,
  1998. b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  1999. b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
  2000. signed_default = create_signed_value(SignedValueTest.SECRET,
  2001. "key", "value", clock=self.present)
  2002. self.assertEqual(signed_default, signed_v2)
  2003. decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
  2004. signed_v1, min_version=1,
  2005. clock=self.present)
  2006. self.assertEqual(decoded_v1, b"value")
  2007. decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
  2008. signed_v2, min_version=2,
  2009. clock=self.present)
  2010. self.assertEqual(decoded_v2, b"value")
  2011. def test_name_swap(self):
  2012. signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
  2013. clock=self.present)
  2014. signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
  2015. clock=self.present)
  2016. # Try decoding each string with the other's "name"
  2017. decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
  2018. clock=self.present)
  2019. self.assertIs(decoded1, None)
  2020. decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
  2021. clock=self.present)
  2022. self.assertIs(decoded2, None)
  2023. def test_expired(self):
  2024. signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
  2025. clock=self.past)
  2026. decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
  2027. signed, clock=self.past)
  2028. self.assertEqual(decoded_past, b"value")
  2029. decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
  2030. signed, clock=self.present)
  2031. self.assertIs(decoded_present, None)
  2032. def test_payload_tampering(self):
  2033. # These cookies are variants of the one in test_known_values.
  2034. sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
  2035. def validate(prefix):
  2036. return (b'value' ==
  2037. decode_signed_value(SignedValueTest.SECRET, "key",
  2038. prefix + sig, clock=self.present))
  2039. self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
  2040. # Change key version
  2041. self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
  2042. # length mismatch (field too short)
  2043. self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
  2044. # length mismatch (field too long)
  2045. self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
  2046. def test_signature_tampering(self):
  2047. prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
  2048. def validate(sig):
  2049. return (b'value' ==
  2050. decode_signed_value(SignedValueTest.SECRET, "key",
  2051. prefix + sig, clock=self.present))
  2052. self.assertTrue(validate(
  2053. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
  2054. # All zeros
  2055. self.assertFalse(validate("0" * 32))
  2056. # Change one character
  2057. self.assertFalse(validate(
  2058. "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
  2059. # Change another character
  2060. self.assertFalse(validate(
  2061. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
  2062. # Truncate
  2063. self.assertFalse(validate(
  2064. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
  2065. # Lengthen
  2066. self.assertFalse(validate(
  2067. "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
  2068. def test_non_ascii(self):
  2069. value = b"\xe9"
  2070. signed = create_signed_value(SignedValueTest.SECRET, "key", value,
  2071. clock=self.present)
  2072. decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
  2073. clock=self.present)
  2074. self.assertEqual(value, decoded)
  2075. def test_key_versioning_read_write_default_key(self):
  2076. value = b"\xe9"
  2077. signed = create_signed_value(SignedValueTest.SECRET_DICT,
  2078. "key", value, clock=self.present,
  2079. key_version=0)
  2080. decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
  2081. "key", signed, clock=self.present)
  2082. self.assertEqual(value, decoded)
  2083. def test_key_versioning_read_write_non_default_key(self):
  2084. value = b"\xe9"
  2085. signed = create_signed_value(SignedValueTest.SECRET_DICT,
  2086. "key", value, clock=self.present,
  2087. key_version=1)
  2088. decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
  2089. "key", signed, clock=self.present)
  2090. self.assertEqual(value, decoded)
  2091. def test_key_versioning_invalid_key(self):
  2092. value = b"\xe9"
  2093. signed = create_signed_value(SignedValueTest.SECRET_DICT,
  2094. "key", value, clock=self.present,
  2095. key_version=0)
  2096. newkeys = SignedValueTest.SECRET_DICT.copy()
  2097. newkeys.pop(0)
  2098. decoded = decode_signed_value(newkeys,
  2099. "key", signed, clock=self.present)
  2100. self.assertEqual(None, decoded)
  2101. def test_key_version_retrieval(self):
  2102. value = b"\xe9"
  2103. signed = create_signed_value(SignedValueTest.SECRET_DICT,
  2104. "key", value, clock=self.present,
  2105. key_version=1)
  2106. key_version = get_signature_key_version(signed)
  2107. self.assertEqual(1, key_version)
  2108. @wsgi_safe
  2109. class XSRFTest(SimpleHandlerTestCase):
  2110. class Handler(RequestHandler):
  2111. def get(self):
  2112. version = int(self.get_argument("version", "2"))
  2113. # This would be a bad idea in a real app, but in this test
  2114. # it's fine.
  2115. self.settings["xsrf_cookie_version"] = version
  2116. self.write(self.xsrf_token)
  2117. def post(self):
  2118. self.write("ok")
  2119. def get_app_kwargs(self):
  2120. return dict(xsrf_cookies=True)
  2121. def setUp(self):
  2122. super(XSRFTest, self).setUp()
  2123. self.xsrf_token = self.get_token()
  2124. def get_token(self, old_token=None, version=None):
  2125. if old_token is not None:
  2126. headers = self.cookie_headers(old_token)
  2127. else:
  2128. headers = None
  2129. response = self.fetch(
  2130. "/" if version is None else ("/?version=%d" % version),
  2131. headers=headers)
  2132. response.rethrow()
  2133. return native_str(response.body)
  2134. def cookie_headers(self, token=None):
  2135. if token is None:
  2136. token = self.xsrf_token
  2137. return {"Cookie": "_xsrf=" + token}
  2138. def test_xsrf_fail_no_token(self):
  2139. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2140. response = self.fetch("/", method="POST", body=b"")
  2141. self.assertEqual(response.code, 403)
  2142. def test_xsrf_fail_body_no_cookie(self):
  2143. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2144. response = self.fetch(
  2145. "/", method="POST",
  2146. body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
  2147. self.assertEqual(response.code, 403)
  2148. def test_xsrf_fail_argument_invalid_format(self):
  2149. with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"):
  2150. response = self.fetch(
  2151. "/", method="POST",
  2152. headers=self.cookie_headers(),
  2153. body=urllib_parse.urlencode(dict(_xsrf='3|')))
  2154. self.assertEqual(response.code, 403)
  2155. def test_xsrf_fail_cookie_invalid_format(self):
  2156. with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
  2157. response = self.fetch(
  2158. "/", method="POST",
  2159. headers=self.cookie_headers(token='3|'),
  2160. body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
  2161. self.assertEqual(response.code, 403)
  2162. def test_xsrf_fail_cookie_no_body(self):
  2163. with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
  2164. response = self.fetch(
  2165. "/", method="POST", body=b"",
  2166. headers=self.cookie_headers())
  2167. self.assertEqual(response.code, 403)
  2168. def test_xsrf_success_short_token(self):
  2169. response = self.fetch(
  2170. "/", method="POST",
  2171. body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
  2172. headers=self.cookie_headers(token='deadbeef'))
  2173. self.assertEqual(response.code, 200)
  2174. def test_xsrf_success_non_hex_token(self):
  2175. response = self.fetch(
  2176. "/", method="POST",
  2177. body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
  2178. headers=self.cookie_headers(token='xoxo'))
  2179. self.assertEqual(response.code, 200)
  2180. def test_xsrf_success_post_body(self):
  2181. response = self.fetch(
  2182. "/", method="POST",
  2183. body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2184. headers=self.cookie_headers())
  2185. self.assertEqual(response.code, 200)
  2186. def test_xsrf_success_query_string(self):
  2187. response = self.fetch(
  2188. "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2189. method="POST", body=b"",
  2190. headers=self.cookie_headers())
  2191. self.assertEqual(response.code, 200)
  2192. def test_xsrf_success_header(self):
  2193. response = self.fetch("/", method="POST", body=b"",
  2194. headers=dict({"X-Xsrftoken": self.xsrf_token}, # type: ignore
  2195. **self.cookie_headers()))
  2196. self.assertEqual(response.code, 200)
  2197. def test_distinct_tokens(self):
  2198. # Every request gets a distinct token.
  2199. NUM_TOKENS = 10
  2200. tokens = set()
  2201. for i in range(NUM_TOKENS):
  2202. tokens.add(self.get_token())
  2203. self.assertEqual(len(tokens), NUM_TOKENS)
  2204. def test_cross_user(self):
  2205. token2 = self.get_token()
  2206. # Each token can be used to authenticate its own request.
  2207. for token in (self.xsrf_token, token2):
  2208. response = self.fetch(
  2209. "/", method="POST",
  2210. body=urllib_parse.urlencode(dict(_xsrf=token)),
  2211. headers=self.cookie_headers(token))
  2212. self.assertEqual(response.code, 200)
  2213. # Sending one in the cookie and the other in the body is not allowed.
  2214. for cookie_token, body_token in ((self.xsrf_token, token2),
  2215. (token2, self.xsrf_token)):
  2216. with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
  2217. response = self.fetch(
  2218. "/", method="POST",
  2219. body=urllib_parse.urlencode(dict(_xsrf=body_token)),
  2220. headers=self.cookie_headers(cookie_token))
  2221. self.assertEqual(response.code, 403)
  2222. def test_refresh_token(self):
  2223. token = self.xsrf_token
  2224. tokens_seen = set([token])
  2225. # A user's token is stable over time. Refreshing the page in one tab
  2226. # might update the cookie while an older tab still has the old cookie
  2227. # in its DOM. Simulate this scenario by passing a constant token
  2228. # in the body and re-querying for the token.
  2229. for i in range(5):
  2230. token = self.get_token(token)
  2231. # Tokens are encoded uniquely each time
  2232. tokens_seen.add(token)
  2233. response = self.fetch(
  2234. "/", method="POST",
  2235. body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
  2236. headers=self.cookie_headers(token))
  2237. self.assertEqual(response.code, 200)
  2238. self.assertEqual(len(tokens_seen), 6)
  2239. def test_versioning(self):
  2240. # Version 1 still produces distinct tokens per request.
  2241. self.assertNotEqual(self.get_token(version=1),
  2242. self.get_token(version=1))
  2243. # Refreshed v1 tokens are all identical.
  2244. v1_token = self.get_token(version=1)
  2245. for i in range(5):
  2246. self.assertEqual(self.get_token(v1_token, version=1), v1_token)
  2247. # Upgrade to a v2 version of the same token
  2248. v2_token = self.get_token(v1_token)
  2249. self.assertNotEqual(v1_token, v2_token)
  2250. # Each v1 token can map to many v2 tokens.
  2251. self.assertNotEqual(v2_token, self.get_token(v1_token))
  2252. # The tokens are cross-compatible.
  2253. for cookie_token, body_token in ((v1_token, v2_token),
  2254. (v2_token, v1_token)):
  2255. response = self.fetch(
  2256. "/", method="POST",
  2257. body=urllib_parse.urlencode(dict(_xsrf=body_token)),
  2258. headers=self.cookie_headers(cookie_token))
  2259. self.assertEqual(response.code, 200)
  2260. @wsgi_safe
  2261. class XSRFCookieKwargsTest(SimpleHandlerTestCase):
  2262. class Handler(RequestHandler):
  2263. def get(self):
  2264. self.write(self.xsrf_token)
  2265. def get_app_kwargs(self):
  2266. return dict(xsrf_cookies=True,
  2267. xsrf_cookie_kwargs=dict(httponly=True))
  2268. def test_xsrf_httponly(self):
  2269. response = self.fetch("/")
  2270. self.assertIn('httponly;', response.headers['Set-Cookie'].lower())
  2271. @wsgi_safe
  2272. class FinishExceptionTest(SimpleHandlerTestCase):
  2273. class Handler(RequestHandler):
  2274. def get(self):
  2275. self.set_status(401)
  2276. self.set_header('WWW-Authenticate', 'Basic realm="something"')
  2277. if self.get_argument('finish_value', ''):
  2278. raise Finish('authentication required')
  2279. else:
  2280. self.write('authentication required')
  2281. raise Finish()
  2282. def test_finish_exception(self):
  2283. for u in ['/', '/?finish_value=1']:
  2284. response = self.fetch(u)
  2285. self.assertEqual(response.code, 401)
  2286. self.assertEqual('Basic realm="something"',
  2287. response.headers.get('WWW-Authenticate'))
  2288. self.assertEqual(b'authentication required', response.body)
  2289. @wsgi_safe
  2290. class DecoratorTest(WebTestCase):
  2291. def get_handlers(self):
  2292. class RemoveSlashHandler(RequestHandler):
  2293. @removeslash
  2294. def get(self):
  2295. pass
  2296. class AddSlashHandler(RequestHandler):
  2297. @addslash
  2298. def get(self):
  2299. pass
  2300. return [("/removeslash/", RemoveSlashHandler),
  2301. ("/addslash", AddSlashHandler),
  2302. ]
  2303. def test_removeslash(self):
  2304. response = self.fetch("/removeslash/", follow_redirects=False)
  2305. self.assertEqual(response.code, 301)
  2306. self.assertEqual(response.headers['Location'], "/removeslash")
  2307. response = self.fetch("/removeslash/?foo=bar", follow_redirects=False)
  2308. self.assertEqual(response.code, 301)
  2309. self.assertEqual(response.headers['Location'], "/removeslash?foo=bar")
  2310. def test_addslash(self):
  2311. response = self.fetch("/addslash", follow_redirects=False)
  2312. self.assertEqual(response.code, 301)
  2313. self.assertEqual(response.headers['Location'], "/addslash/")
  2314. response = self.fetch("/addslash?foo=bar", follow_redirects=False)
  2315. self.assertEqual(response.code, 301)
  2316. self.assertEqual(response.headers['Location'], "/addslash/?foo=bar")
  2317. @wsgi_safe
  2318. class CacheTest(WebTestCase):
  2319. def get_handlers(self):
  2320. class EtagHandler(RequestHandler):
  2321. def get(self, computed_etag):
  2322. self.write(computed_etag)
  2323. def compute_etag(self):
  2324. return self._write_buffer[0]
  2325. return [
  2326. ('/etag/(.*)', EtagHandler)
  2327. ]
  2328. def test_wildcard_etag(self):
  2329. computed_etag = '"xyzzy"'
  2330. etags = '*'
  2331. self._test_etag(computed_etag, etags, 304)
  2332. def test_strong_etag_match(self):
  2333. computed_etag = '"xyzzy"'
  2334. etags = '"xyzzy"'
  2335. self._test_etag(computed_etag, etags, 304)
  2336. def test_multiple_strong_etag_match(self):
  2337. computed_etag = '"xyzzy1"'
  2338. etags = '"xyzzy1", "xyzzy2"'
  2339. self._test_etag(computed_etag, etags, 304)
  2340. def test_strong_etag_not_match(self):
  2341. computed_etag = '"xyzzy"'
  2342. etags = '"xyzzy1"'
  2343. self._test_etag(computed_etag, etags, 200)
  2344. def test_multiple_strong_etag_not_match(self):
  2345. computed_etag = '"xyzzy"'
  2346. etags = '"xyzzy1", "xyzzy2"'
  2347. self._test_etag(computed_etag, etags, 200)
  2348. def test_weak_etag_match(self):
  2349. computed_etag = '"xyzzy1"'
  2350. etags = 'W/"xyzzy1"'
  2351. self._test_etag(computed_etag, etags, 304)
  2352. def test_multiple_weak_etag_match(self):
  2353. computed_etag = '"xyzzy2"'
  2354. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2355. self._test_etag(computed_etag, etags, 304)
  2356. def test_weak_etag_not_match(self):
  2357. computed_etag = '"xyzzy2"'
  2358. etags = 'W/"xyzzy1"'
  2359. self._test_etag(computed_etag, etags, 200)
  2360. def test_multiple_weak_etag_not_match(self):
  2361. computed_etag = '"xyzzy3"'
  2362. etags = 'W/"xyzzy1", W/"xyzzy2"'
  2363. self._test_etag(computed_etag, etags, 200)
  2364. def _test_etag(self, computed_etag, etags, status_code):
  2365. response = self.fetch(
  2366. '/etag/' + computed_etag,
  2367. headers={'If-None-Match': etags}
  2368. )
  2369. self.assertEqual(response.code, status_code)
  2370. @wsgi_safe
  2371. class RequestSummaryTest(SimpleHandlerTestCase):
  2372. class Handler(RequestHandler):
  2373. def get(self):
  2374. # remote_ip is optional, although it's set by
  2375. # both HTTPServer and WSGIAdapter.
  2376. # Clobber it to make sure it doesn't break logging.
  2377. self.request.remote_ip = None
  2378. self.finish(self._request_summary())
  2379. def test_missing_remote_ip(self):
  2380. resp = self.fetch("/")
  2381. self.assertEqual(resp.body, b"GET / (None)")
  2382. class HTTPErrorTest(unittest.TestCase):
  2383. def test_copy(self):
  2384. e = HTTPError(403, reason="Go away")
  2385. e2 = copy.copy(e)
  2386. self.assertIsNot(e, e2)
  2387. self.assertEqual(e.status_code, e2.status_code)
  2388. self.assertEqual(e.reason, e2.reason)
  2389. class ApplicationTest(AsyncTestCase):
  2390. def test_listen(self):
  2391. app = Application([])
  2392. server = app.listen(0, address='127.0.0.1')
  2393. server.stop()
  2394. class URLSpecReverseTest(unittest.TestCase):
  2395. def test_reverse(self):
  2396. self.assertEqual('/favicon.ico', url(r'/favicon\.ico', None).reverse())
  2397. self.assertEqual('/favicon.ico', url(r'^/favicon\.ico$', None).reverse())
  2398. def test_non_reversible(self):
  2399. # URLSpecs are non-reversible if they include non-constant
  2400. # regex features outside capturing groups. Currently, this is
  2401. # only strictly enforced for backslash-escaped character
  2402. # classes.
  2403. paths = [
  2404. r'^/api/v\d+/foo/(\w+)$',
  2405. ]
  2406. for path in paths:
  2407. # A URLSpec can still be created even if it cannot be reversed.
  2408. url_spec = url(path, None)
  2409. try:
  2410. result = url_spec.reverse()
  2411. self.fail("did not get expected exception when reversing %s. "
  2412. "result: %s" % (path, result))
  2413. except ValueError:
  2414. pass
  2415. def test_reverse_arguments(self):
  2416. self.assertEqual('/api/v1/foo/bar',
  2417. url(r'^/api/v1/foo/(\w+)$', None).reverse('bar'))
  2418. class RedirectHandlerTest(WebTestCase):
  2419. def get_handlers(self):
  2420. return [
  2421. ('/src', WebRedirectHandler, {'url': '/dst'}),
  2422. ('/src2', WebRedirectHandler, {'url': '/dst2?foo=bar'}),
  2423. (r'/(.*?)/(.*?)/(.*)', WebRedirectHandler, {'url': '/{1}/{0}/{2}'})]
  2424. def test_basic_redirect(self):
  2425. response = self.fetch('/src', follow_redirects=False)
  2426. self.assertEqual(response.code, 301)
  2427. self.assertEqual(response.headers['Location'], '/dst')
  2428. def test_redirect_with_argument(self):
  2429. response = self.fetch('/src?foo=bar', follow_redirects=False)
  2430. self.assertEqual(response.code, 301)
  2431. self.assertEqual(response.headers['Location'], '/dst?foo=bar')
  2432. def test_redirect_with_appending_argument(self):
  2433. response = self.fetch('/src2?foo2=bar2', follow_redirects=False)
  2434. self.assertEqual(response.code, 301)
  2435. self.assertEqual(response.headers['Location'], '/dst2?foo=bar&foo2=bar2')
  2436. def test_redirect_pattern(self):
  2437. response = self.fetch('/a/b/c', follow_redirects=False)
  2438. self.assertEqual(response.code, 301)
  2439. self.assertEqual(response.headers['Location'], '/b/a/c')