| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967 |
- from __future__ import absolute_import, division, print_function
- from tornado.concurrent import Future
- from tornado import gen
- from tornado.escape import json_decode, utf8, to_unicode, recursive_unicode, native_str, to_basestring # noqa: E501
- from tornado.httpclient import HTTPClientError
- from tornado.httputil import format_timestamp
- from tornado.ioloop import IOLoop
- from tornado.iostream import IOStream
- from tornado import locale
- from tornado.locks import Event
- from tornado.log import app_log, gen_log
- from tornado.simple_httpclient import SimpleAsyncHTTPClient
- from tornado.template import DictLoader
- from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
- from tornado.test.util import unittest, skipBefore35, exec_test, ignore_deprecation
- from tornado.util import ObjectDict, unicode_type, timedelta_to_seconds, PY3
- from tornado.web import (
- Application, RequestHandler, StaticFileHandler, RedirectHandler as WebRedirectHandler,
- HTTPError, MissingArgumentError, ErrorHandler, authenticated, asynchronous, url,
- _create_signature_v1, create_signed_value, decode_signed_value, get_signature_key_version,
- UIModule, Finish, stream_request_body, removeslash, addslash, GZipContentEncoding,
- )
- import binascii
- import contextlib
- import copy
- import datetime
- import email.utils
- import gzip
- from io import BytesIO
- import itertools
- import logging
- import os
- import re
- import socket
- if PY3:
- import urllib.parse as urllib_parse # py3
- else:
- import urllib as urllib_parse # py2
- wsgi_safe_tests = []
- def relpath(*a):
- return os.path.join(os.path.dirname(__file__), *a)
- def wsgi_safe(cls):
- wsgi_safe_tests.append(cls)
- return cls
- class WebTestCase(AsyncHTTPTestCase):
- """Base class for web tests that also supports WSGI mode.
- Override get_handlers and get_app_kwargs instead of get_app.
- Append to wsgi_safe to have it run in wsgi_test as well.
- """
- def get_app(self):
- self.app = Application(self.get_handlers(), **self.get_app_kwargs())
- return self.app
- def get_handlers(self):
- raise NotImplementedError()
- def get_app_kwargs(self):
- return {}
- class SimpleHandlerTestCase(WebTestCase):
- """Simplified base class for tests that work with a single handler class.
- To use, define a nested class named ``Handler``.
- """
- def get_handlers(self):
- return [('/', self.Handler)]
- class HelloHandler(RequestHandler):
- def get(self):
- self.write('hello')
- class CookieTestRequestHandler(RequestHandler):
- # stub out enough methods to make the secure_cookie functions work
- def __init__(self, cookie_secret='0123456789', key_version=None):
- # don't call super.__init__
- self._cookies = {}
- if key_version is None:
- self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret))
- else:
- self.application = ObjectDict(settings=dict(cookie_secret=cookie_secret,
- key_version=key_version))
- def get_cookie(self, name):
- return self._cookies.get(name)
- def set_cookie(self, name, value, expires_days=None):
- self._cookies[name] = value
- # See SignedValueTest below for more.
- class SecureCookieV1Test(unittest.TestCase):
- def test_round_trip(self):
- handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b'bar', version=1)
- self.assertEqual(handler.get_secure_cookie('foo', min_version=1),
- b'bar')
- def test_cookie_tampering_future_timestamp(self):
- handler = CookieTestRequestHandler()
- # this string base64-encodes to '12345678'
- handler.set_secure_cookie('foo', binascii.a2b_hex(b'd76df8e7aefc'),
- version=1)
- cookie = handler._cookies['foo']
- match = re.match(br'12345678\|([0-9]+)\|([0-9a-f]+)', cookie)
- self.assertTrue(match)
- timestamp = match.group(1)
- sig = match.group(2)
- self.assertEqual(
- _create_signature_v1(handler.application.settings["cookie_secret"],
- 'foo', '12345678', timestamp),
- sig)
- # shifting digits from payload to timestamp doesn't alter signature
- # (this is not desirable behavior, just confirming that that's how it
- # works)
- self.assertEqual(
- _create_signature_v1(handler.application.settings["cookie_secret"],
- 'foo', '1234', b'5678' + timestamp),
- sig)
- # tamper with the cookie
- handler._cookies['foo'] = utf8('1234|5678%s|%s' % (
- to_basestring(timestamp), to_basestring(sig)))
- # it gets rejected
- with ExpectLog(gen_log, "Cookie timestamp in future"):
- self.assertTrue(
- handler.get_secure_cookie('foo', min_version=1) is None)
- def test_arbitrary_bytes(self):
- # Secure cookies accept arbitrary data (which is base64 encoded).
- # Note that normal cookies accept only a subset of ascii.
- handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b'\xe9', version=1)
- self.assertEqual(handler.get_secure_cookie('foo', min_version=1), b'\xe9')
- # See SignedValueTest below for more.
- class SecureCookieV2Test(unittest.TestCase):
- KEY_VERSIONS = {
- 0: 'ajklasdf0ojaisdf',
- 1: 'aslkjasaolwkjsdf'
- }
- def test_round_trip(self):
- handler = CookieTestRequestHandler()
- handler.set_secure_cookie('foo', b'bar', version=2)
- self.assertEqual(handler.get_secure_cookie('foo', min_version=2), b'bar')
- def test_key_version_roundtrip(self):
- handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
- key_version=0)
- handler.set_secure_cookie('foo', b'bar')
- self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
- def test_key_version_roundtrip_differing_version(self):
- handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
- key_version=1)
- handler.set_secure_cookie('foo', b'bar')
- self.assertEqual(handler.get_secure_cookie('foo'), b'bar')
- def test_key_version_increment_version(self):
- handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
- key_version=0)
- handler.set_secure_cookie('foo', b'bar')
- new_handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
- key_version=1)
- new_handler._cookies = handler._cookies
- self.assertEqual(new_handler.get_secure_cookie('foo'), b'bar')
- def test_key_version_invalidate_version(self):
- handler = CookieTestRequestHandler(cookie_secret=self.KEY_VERSIONS,
- key_version=0)
- handler.set_secure_cookie('foo', b'bar')
- new_key_versions = self.KEY_VERSIONS.copy()
- new_key_versions.pop(0)
- new_handler = CookieTestRequestHandler(cookie_secret=new_key_versions,
- key_version=1)
- new_handler._cookies = handler._cookies
- self.assertEqual(new_handler.get_secure_cookie('foo'), None)
- class FinalReturnTest(WebTestCase):
- def get_handlers(self):
- test = self
- class FinishHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- test.final_return = self.finish()
- yield test.final_return
- class RenderHandler(RequestHandler):
- def create_template_loader(self, path):
- return DictLoader({'foo.html': 'hi'})
- @gen.coroutine
- def get(self):
- test.final_return = self.render('foo.html')
- return [("/finish", FinishHandler),
- ("/render", RenderHandler)]
- def get_app_kwargs(self):
- return dict(template_path='FinalReturnTest')
- def test_finish_method_return_future(self):
- response = self.fetch(self.get_url('/finish'))
- self.assertEqual(response.code, 200)
- self.assertIsInstance(self.final_return, Future)
- self.assertTrue(self.final_return.done())
- def test_render_method_return_future(self):
- response = self.fetch(self.get_url('/render'))
- self.assertEqual(response.code, 200)
- self.assertIsInstance(self.final_return, Future)
- class CookieTest(WebTestCase):
- def get_handlers(self):
- class SetCookieHandler(RequestHandler):
- def get(self):
- # Try setting cookies with different argument types
- # to ensure that everything gets encoded correctly
- self.set_cookie("str", "asdf")
- self.set_cookie("unicode", u"qwer")
- self.set_cookie("bytes", b"zxcv")
- class GetCookieHandler(RequestHandler):
- def get(self):
- self.write(self.get_cookie("foo", "default"))
- class SetCookieDomainHandler(RequestHandler):
- def get(self):
- # unicode domain and path arguments shouldn't break things
- # either (see bug #285)
- self.set_cookie("unicode_args", "blah", domain=u"foo.com",
- path=u"/foo")
- class SetCookieSpecialCharHandler(RequestHandler):
- def get(self):
- self.set_cookie("equals", "a=b")
- self.set_cookie("semicolon", "a;b")
- self.set_cookie("quote", 'a"b')
- class SetCookieOverwriteHandler(RequestHandler):
- def get(self):
- self.set_cookie("a", "b", domain="example.com")
- self.set_cookie("c", "d", domain="example.com")
- # A second call with the same name clobbers the first.
- # Attributes from the first call are not carried over.
- self.set_cookie("a", "e")
- class SetCookieMaxAgeHandler(RequestHandler):
- def get(self):
- self.set_cookie("foo", "bar", max_age=10)
- class SetCookieExpiresDaysHandler(RequestHandler):
- def get(self):
- self.set_cookie("foo", "bar", expires_days=10)
- class SetCookieFalsyFlags(RequestHandler):
- def get(self):
- self.set_cookie("a", "1", secure=True)
- self.set_cookie("b", "1", secure=False)
- self.set_cookie("c", "1", httponly=True)
- self.set_cookie("d", "1", httponly=False)
- return [("/set", SetCookieHandler),
- ("/get", GetCookieHandler),
- ("/set_domain", SetCookieDomainHandler),
- ("/special_char", SetCookieSpecialCharHandler),
- ("/set_overwrite", SetCookieOverwriteHandler),
- ("/set_max_age", SetCookieMaxAgeHandler),
- ("/set_expires_days", SetCookieExpiresDaysHandler),
- ("/set_falsy_flags", SetCookieFalsyFlags)
- ]
- def test_set_cookie(self):
- response = self.fetch("/set")
- self.assertEqual(sorted(response.headers.get_list("Set-Cookie")),
- ["bytes=zxcv; Path=/",
- "str=asdf; Path=/",
- "unicode=qwer; Path=/",
- ])
- def test_get_cookie(self):
- response = self.fetch("/get", headers={"Cookie": "foo=bar"})
- self.assertEqual(response.body, b"bar")
- response = self.fetch("/get", headers={"Cookie": 'foo="bar"'})
- self.assertEqual(response.body, b"bar")
- response = self.fetch("/get", headers={"Cookie": "/=exception;"})
- self.assertEqual(response.body, b"default")
- def test_set_cookie_domain(self):
- response = self.fetch("/set_domain")
- self.assertEqual(response.headers.get_list("Set-Cookie"),
- ["unicode_args=blah; Domain=foo.com; Path=/foo"])
- def test_cookie_special_char(self):
- response = self.fetch("/special_char")
- headers = sorted(response.headers.get_list("Set-Cookie"))
- self.assertEqual(len(headers), 3)
- self.assertEqual(headers[0], 'equals="a=b"; Path=/')
- self.assertEqual(headers[1], 'quote="a\\"b"; Path=/')
- # python 2.7 octal-escapes the semicolon; older versions leave it alone
- self.assertTrue(headers[2] in ('semicolon="a;b"; Path=/',
- 'semicolon="a\\073b"; Path=/'),
- headers[2])
- data = [('foo=a=b', 'a=b'),
- ('foo="a=b"', 'a=b'),
- ('foo="a;b"', '"a'), # even quoted, ";" is a delimiter
- ('foo=a\\073b', 'a\\073b'), # escapes only decoded in quotes
- ('foo="a\\073b"', 'a;b'),
- ('foo="a\\"b"', 'a"b'),
- ]
- for header, expected in data:
- logging.debug("trying %r", header)
- response = self.fetch("/get", headers={"Cookie": header})
- self.assertEqual(response.body, utf8(expected))
- def test_set_cookie_overwrite(self):
- response = self.fetch("/set_overwrite")
- headers = response.headers.get_list("Set-Cookie")
- self.assertEqual(sorted(headers),
- ["a=e; Path=/", "c=d; Domain=example.com; Path=/"])
- def test_set_cookie_max_age(self):
- response = self.fetch("/set_max_age")
- headers = response.headers.get_list("Set-Cookie")
- self.assertEqual(sorted(headers),
- ["foo=bar; Max-Age=10; Path=/"])
- def test_set_cookie_expires_days(self):
- response = self.fetch("/set_expires_days")
- header = response.headers.get("Set-Cookie")
- match = re.match("foo=bar; expires=(?P<expires>.+); Path=/", header)
- self.assertIsNotNone(match)
- expires = datetime.datetime.utcnow() + datetime.timedelta(days=10)
- header_expires = datetime.datetime(
- *email.utils.parsedate(match.groupdict()["expires"])[:6])
- self.assertTrue(abs(timedelta_to_seconds(expires - header_expires)) < 10)
- def test_set_cookie_false_flags(self):
- response = self.fetch("/set_falsy_flags")
- headers = sorted(response.headers.get_list("Set-Cookie"))
- # The secure and httponly headers are capitalized in py35 and
- # lowercase in older versions.
- self.assertEqual(headers[0].lower(), 'a=1; path=/; secure')
- self.assertEqual(headers[1].lower(), 'b=1; path=/')
- self.assertEqual(headers[2].lower(), 'c=1; httponly; path=/')
- self.assertEqual(headers[3].lower(), 'd=1; path=/')
- class AuthRedirectRequestHandler(RequestHandler):
- def initialize(self, login_url):
- self.login_url = login_url
- def get_login_url(self):
- return self.login_url
- @authenticated
- def get(self):
- # we'll never actually get here because the test doesn't follow redirects
- self.send_error(500)
- class AuthRedirectTest(WebTestCase):
- def get_handlers(self):
- return [('/relative', AuthRedirectRequestHandler,
- dict(login_url='/login')),
- ('/absolute', AuthRedirectRequestHandler,
- dict(login_url='http://example.com/login'))]
- def test_relative_auth_redirect(self):
- response = self.fetch(self.get_url('/relative'),
- follow_redirects=False)
- self.assertEqual(response.code, 302)
- self.assertEqual(response.headers['Location'], '/login?next=%2Frelative')
- def test_absolute_auth_redirect(self):
- response = self.fetch(self.get_url('/absolute'),
- follow_redirects=False)
- self.assertEqual(response.code, 302)
- self.assertTrue(re.match(
- 'http://example.com/login\?next=http%3A%2F%2F127.0.0.1%3A[0-9]+%2Fabsolute',
- response.headers['Location']), response.headers['Location'])
- class ConnectionCloseHandler(RequestHandler):
- def initialize(self, test):
- self.test = test
- @gen.coroutine
- def get(self):
- self.test.on_handler_waiting()
- never_finish = Event()
- yield never_finish.wait()
- def on_connection_close(self):
- self.test.on_connection_close()
- class ConnectionCloseTest(WebTestCase):
- def get_handlers(self):
- return [('/', ConnectionCloseHandler, dict(test=self))]
- def test_connection_close(self):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- s.connect(("127.0.0.1", self.get_http_port()))
- self.stream = IOStream(s)
- self.stream.write(b"GET / HTTP/1.0\r\n\r\n")
- self.wait()
- def on_handler_waiting(self):
- logging.debug('handler waiting')
- self.stream.close()
- def on_connection_close(self):
- logging.debug('connection closed')
- self.stop()
- class EchoHandler(RequestHandler):
- def get(self, *path_args):
- # Type checks: web.py interfaces convert argument values to
- # unicode strings (by default, but see also decode_argument).
- # In httpserver.py (i.e. self.request.arguments), they're left
- # as bytes. Keys are always native strings.
- for key in self.request.arguments:
- if type(key) != str:
- raise Exception("incorrect type for key: %r" % type(key))
- for value in self.request.arguments[key]:
- if type(value) != bytes:
- raise Exception("incorrect type for value: %r" %
- type(value))
- for value in self.get_arguments(key):
- if type(value) != unicode_type:
- raise Exception("incorrect type for value: %r" %
- type(value))
- for arg in path_args:
- if type(arg) != unicode_type:
- raise Exception("incorrect type for path arg: %r" % type(arg))
- self.write(dict(path=self.request.path,
- path_args=path_args,
- args=recursive_unicode(self.request.arguments)))
- class RequestEncodingTest(WebTestCase):
- def get_handlers(self):
- return [("/group/(.*)", EchoHandler),
- ("/slashes/([^/]*)/([^/]*)", EchoHandler),
- ]
- def fetch_json(self, path):
- return json_decode(self.fetch(path).body)
- def test_group_question_mark(self):
- # Ensure that url-encoded question marks are handled properly
- self.assertEqual(self.fetch_json('/group/%3F'),
- dict(path='/group/%3F', path_args=['?'], args={}))
- self.assertEqual(self.fetch_json('/group/%3F?%3F=%3F'),
- dict(path='/group/%3F', path_args=['?'], args={'?': ['?']}))
- def test_group_encoding(self):
- # Path components and query arguments should be decoded the same way
- self.assertEqual(self.fetch_json('/group/%C3%A9?arg=%C3%A9'),
- {u"path": u"/group/%C3%A9",
- u"path_args": [u"\u00e9"],
- u"args": {u"arg": [u"\u00e9"]}})
- def test_slashes(self):
- # Slashes may be escaped to appear as a single "directory" in the path,
- # but they are then unescaped when passed to the get() method.
- self.assertEqual(self.fetch_json('/slashes/foo/bar'),
- dict(path="/slashes/foo/bar",
- path_args=["foo", "bar"],
- args={}))
- self.assertEqual(self.fetch_json('/slashes/a%2Fb/c%2Fd'),
- dict(path="/slashes/a%2Fb/c%2Fd",
- path_args=["a/b", "c/d"],
- args={}))
- def test_error(self):
- # Percent signs (encoded as %25) should not mess up printf-style
- # messages in logs
- with ExpectLog(gen_log, ".*Invalid unicode"):
- self.fetch("/group/?arg=%25%e9")
- class TypeCheckHandler(RequestHandler):
- def prepare(self):
- self.errors = {}
- self.check_type('status', self.get_status(), int)
- # get_argument is an exception from the general rule of using
- # type str for non-body data mainly for historical reasons.
- self.check_type('argument', self.get_argument('foo'), unicode_type)
- self.check_type('cookie_key', list(self.cookies.keys())[0], str)
- self.check_type('cookie_value', list(self.cookies.values())[0].value, str)
- # Secure cookies return bytes because they can contain arbitrary
- # data, but regular cookies are native strings.
- if list(self.cookies.keys()) != ['asdf']:
- raise Exception("unexpected values for cookie keys: %r" %
- self.cookies.keys())
- self.check_type('get_secure_cookie', self.get_secure_cookie('asdf'), bytes)
- self.check_type('get_cookie', self.get_cookie('asdf'), str)
- self.check_type('xsrf_token', self.xsrf_token, bytes)
- self.check_type('xsrf_form_html', self.xsrf_form_html(), str)
- self.check_type('reverse_url', self.reverse_url('typecheck', 'foo'), str)
- self.check_type('request_summary', self._request_summary(), str)
- def get(self, path_component):
- # path_component uses type unicode instead of str for consistency
- # with get_argument()
- self.check_type('path_component', path_component, unicode_type)
- self.write(self.errors)
- def post(self, path_component):
- self.check_type('path_component', path_component, unicode_type)
- self.write(self.errors)
- def check_type(self, name, obj, expected_type):
- actual_type = type(obj)
- if expected_type != actual_type:
- self.errors[name] = "expected %s, got %s" % (expected_type,
- actual_type)
- class DecodeArgHandler(RequestHandler):
- def decode_argument(self, value, name=None):
- if type(value) != bytes:
- raise Exception("unexpected type for value: %r" % type(value))
- # use self.request.arguments directly to avoid recursion
- if 'encoding' in self.request.arguments:
- return value.decode(to_unicode(self.request.arguments['encoding'][0]))
- else:
- return value
- def get(self, arg):
- def describe(s):
- if type(s) == bytes:
- return ["bytes", native_str(binascii.b2a_hex(s))]
- elif type(s) == unicode_type:
- return ["unicode", s]
- raise Exception("unknown type")
- self.write({'path': describe(arg),
- 'query': describe(self.get_argument("foo")),
- })
- class LinkifyHandler(RequestHandler):
- def get(self):
- self.render("linkify.html", message="http://example.com")
- class UIModuleResourceHandler(RequestHandler):
- def get(self):
- self.render("page.html", entries=[1, 2])
- class OptionalPathHandler(RequestHandler):
- def get(self, path):
- self.write({"path": path})
- class FlowControlHandler(RequestHandler):
- # These writes are too small to demonstrate real flow control,
- # but at least it shows that the callbacks get run.
- with ignore_deprecation():
- @asynchronous
- def get(self):
- self.write("1")
- with ignore_deprecation():
- self.flush(callback=self.step2)
- def step2(self):
- self.write("2")
- with ignore_deprecation():
- self.flush(callback=self.step3)
- def step3(self):
- self.write("3")
- self.finish()
- class MultiHeaderHandler(RequestHandler):
- def get(self):
- self.set_header("x-overwrite", "1")
- self.set_header("X-Overwrite", 2)
- self.add_header("x-multi", 3)
- self.add_header("X-Multi", "4")
- class RedirectHandler(RequestHandler):
- def get(self):
- if self.get_argument('permanent', None) is not None:
- self.redirect('/', permanent=int(self.get_argument('permanent')))
- elif self.get_argument('status', None) is not None:
- self.redirect('/', status=int(self.get_argument('status')))
- else:
- raise Exception("didn't get permanent or status arguments")
- class EmptyFlushCallbackHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- # Ensure that the flush callback is run whether or not there
- # was any output. The gen.Task and direct yield forms are
- # equivalent.
- yield self.flush() # "empty" flush, but writes headers
- yield self.flush() # empty flush
- self.write("o")
- yield self.flush() # flushes the "o"
- yield self.flush() # empty flush
- self.finish("k")
- class HeaderInjectionHandler(RequestHandler):
- def get(self):
- try:
- self.set_header("X-Foo", "foo\r\nX-Bar: baz")
- raise Exception("Didn't get expected exception")
- except ValueError as e:
- if "Unsafe header value" in str(e):
- self.finish(b"ok")
- else:
- raise
- class GetArgumentHandler(RequestHandler):
- def prepare(self):
- if self.get_argument('source', None) == 'query':
- method = self.get_query_argument
- elif self.get_argument('source', None) == 'body':
- method = self.get_body_argument
- else:
- method = self.get_argument
- self.finish(method("foo", "default"))
- class GetArgumentsHandler(RequestHandler):
- def prepare(self):
- self.finish(dict(default=self.get_arguments("foo"),
- query=self.get_query_arguments("foo"),
- body=self.get_body_arguments("foo")))
- # This test is shared with wsgi_test.py
- @wsgi_safe
- class WSGISafeWebTest(WebTestCase):
- COOKIE_SECRET = "WebTest.COOKIE_SECRET"
- def get_app_kwargs(self):
- loader = DictLoader({
- "linkify.html": "{% module linkify(message) %}",
- "page.html": """\
- <html><head></head><body>
- {% for e in entries %}
- {% module Template("entry.html", entry=e) %}
- {% end %}
- </body></html>""",
- "entry.html": """\
- {{ set_resources(embedded_css=".entry { margin-bottom: 1em; }",
- embedded_javascript="js_embed()",
- css_files=["/base.css", "/foo.css"],
- javascript_files="/common.js",
- html_head="<meta>",
- html_body='<script src="/analytics.js"/>') }}
- <div class="entry">...</div>""",
- })
- return dict(template_loader=loader,
- autoescape="xhtml_escape",
- cookie_secret=self.COOKIE_SECRET)
- def tearDown(self):
- super(WSGISafeWebTest, self).tearDown()
- RequestHandler._template_loaders.clear()
- def get_handlers(self):
- urls = [
- url("/typecheck/(.*)", TypeCheckHandler, name='typecheck'),
- url("/decode_arg/(.*)", DecodeArgHandler, name='decode_arg'),
- url("/decode_arg_kw/(?P<arg>.*)", DecodeArgHandler),
- url("/linkify", LinkifyHandler),
- url("/uimodule_resources", UIModuleResourceHandler),
- url("/optional_path/(.+)?", OptionalPathHandler),
- url("/multi_header", MultiHeaderHandler),
- url("/redirect", RedirectHandler),
- url("/web_redirect_permanent", WebRedirectHandler, {"url": "/web_redirect_newpath"}),
- url("/web_redirect", WebRedirectHandler,
- {"url": "/web_redirect_newpath", "permanent": False}),
- url("//web_redirect_double_slash", WebRedirectHandler,
- {"url": '/web_redirect_newpath'}),
- url("/header_injection", HeaderInjectionHandler),
- url("/get_argument", GetArgumentHandler),
- url("/get_arguments", GetArgumentsHandler),
- ]
- return urls
- def fetch_json(self, *args, **kwargs):
- response = self.fetch(*args, **kwargs)
- response.rethrow()
- return json_decode(response.body)
- def test_types(self):
- cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
- "asdf", "qwer"))
- response = self.fetch("/typecheck/asdf?foo=bar",
- headers={"Cookie": "asdf=" + cookie_value})
- data = json_decode(response.body)
- self.assertEqual(data, {})
- response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
- headers={"Cookie": "asdf=" + cookie_value},
- body="foo=bar")
- def test_decode_argument(self):
- # These urls all decode to the same thing
- urls = ["/decode_arg/%C3%A9?foo=%C3%A9&encoding=utf-8",
- "/decode_arg/%E9?foo=%E9&encoding=latin1",
- "/decode_arg_kw/%E9?foo=%E9&encoding=latin1",
- ]
- for req_url in urls:
- response = self.fetch(req_url)
- response.rethrow()
- data = json_decode(response.body)
- self.assertEqual(data, {u'path': [u'unicode', u'\u00e9'],
- u'query': [u'unicode', u'\u00e9'],
- })
- response = self.fetch("/decode_arg/%C3%A9?foo=%C3%A9")
- response.rethrow()
- data = json_decode(response.body)
- self.assertEqual(data, {u'path': [u'bytes', u'c3a9'],
- u'query': [u'bytes', u'c3a9'],
- })
- def test_decode_argument_invalid_unicode(self):
- # test that invalid unicode in URLs causes 400, not 500
- with ExpectLog(gen_log, ".*Invalid unicode.*"):
- response = self.fetch("/typecheck/invalid%FF")
- self.assertEqual(response.code, 400)
- response = self.fetch("/typecheck/invalid?foo=%FF")
- self.assertEqual(response.code, 400)
- def test_decode_argument_plus(self):
- # These urls are all equivalent.
- urls = ["/decode_arg/1%20%2B%201?foo=1%20%2B%201&encoding=utf-8",
- "/decode_arg/1%20+%201?foo=1+%2B+1&encoding=utf-8"]
- for req_url in urls:
- response = self.fetch(req_url)
- response.rethrow()
- data = json_decode(response.body)
- self.assertEqual(data, {u'path': [u'unicode', u'1 + 1'],
- u'query': [u'unicode', u'1 + 1'],
- })
- def test_reverse_url(self):
- self.assertEqual(self.app.reverse_url('decode_arg', 'foo'),
- '/decode_arg/foo')
- self.assertEqual(self.app.reverse_url('decode_arg', 42),
- '/decode_arg/42')
- self.assertEqual(self.app.reverse_url('decode_arg', b'\xe9'),
- '/decode_arg/%E9')
- self.assertEqual(self.app.reverse_url('decode_arg', u'\u00e9'),
- '/decode_arg/%C3%A9')
- self.assertEqual(self.app.reverse_url('decode_arg', '1 + 1'),
- '/decode_arg/1%20%2B%201')
- def test_uimodule_unescaped(self):
- response = self.fetch("/linkify")
- self.assertEqual(response.body,
- b"<a href=\"http://example.com\">http://example.com</a>")
- def test_uimodule_resources(self):
- response = self.fetch("/uimodule_resources")
- self.assertEqual(response.body, b"""\
- <html><head><link href="/base.css" type="text/css" rel="stylesheet"/><link href="/foo.css" type="text/css" rel="stylesheet"/>
- <style type="text/css">
- .entry { margin-bottom: 1em; }
- </style>
- <meta>
- </head><body>
- <div class="entry">...</div>
- <div class="entry">...</div>
- <script src="/common.js" type="text/javascript"></script>
- <script type="text/javascript">
- //<![CDATA[
- js_embed()
- //]]>
- </script>
- <script src="/analytics.js"/>
- </body></html>""") # noqa: E501
- def test_optional_path(self):
- self.assertEqual(self.fetch_json("/optional_path/foo"),
- {u"path": u"foo"})
- self.assertEqual(self.fetch_json("/optional_path/"),
- {u"path": None})
- def test_multi_header(self):
- response = self.fetch("/multi_header")
- self.assertEqual(response.headers["x-overwrite"], "2")
- self.assertEqual(response.headers.get_list("x-multi"), ["3", "4"])
- def test_redirect(self):
- response = self.fetch("/redirect?permanent=1", follow_redirects=False)
- self.assertEqual(response.code, 301)
- response = self.fetch("/redirect?permanent=0", follow_redirects=False)
- self.assertEqual(response.code, 302)
- response = self.fetch("/redirect?status=307", follow_redirects=False)
- self.assertEqual(response.code, 307)
- def test_web_redirect(self):
- response = self.fetch("/web_redirect_permanent", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
- response = self.fetch("/web_redirect", follow_redirects=False)
- self.assertEqual(response.code, 302)
- self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
- def test_web_redirect_double_slash(self):
- response = self.fetch("//web_redirect_double_slash", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/web_redirect_newpath')
- def test_header_injection(self):
- response = self.fetch("/header_injection")
- self.assertEqual(response.body, b"ok")
- def test_get_argument(self):
- response = self.fetch("/get_argument?foo=bar")
- self.assertEqual(response.body, b"bar")
- response = self.fetch("/get_argument?foo=")
- self.assertEqual(response.body, b"")
- response = self.fetch("/get_argument")
- self.assertEqual(response.body, b"default")
- # Test merging of query and body arguments.
- # In singular form, body arguments take precedence over query arguments.
- body = urllib_parse.urlencode(dict(foo="hello"))
- response = self.fetch("/get_argument?foo=bar", method="POST", body=body)
- self.assertEqual(response.body, b"hello")
- # In plural methods they are merged.
- response = self.fetch("/get_arguments?foo=bar",
- method="POST", body=body)
- self.assertEqual(json_decode(response.body),
- dict(default=['bar', 'hello'],
- query=['bar'],
- body=['hello']))
- def test_get_query_arguments(self):
- # send as a post so we can ensure the separation between query
- # string and body arguments.
- body = urllib_parse.urlencode(dict(foo="hello"))
- response = self.fetch("/get_argument?source=query&foo=bar",
- method="POST", body=body)
- self.assertEqual(response.body, b"bar")
- response = self.fetch("/get_argument?source=query&foo=",
- method="POST", body=body)
- self.assertEqual(response.body, b"")
- response = self.fetch("/get_argument?source=query",
- method="POST", body=body)
- self.assertEqual(response.body, b"default")
- def test_get_body_arguments(self):
- body = urllib_parse.urlencode(dict(foo="bar"))
- response = self.fetch("/get_argument?source=body&foo=hello",
- method="POST", body=body)
- self.assertEqual(response.body, b"bar")
- body = urllib_parse.urlencode(dict(foo=""))
- response = self.fetch("/get_argument?source=body&foo=hello",
- method="POST", body=body)
- self.assertEqual(response.body, b"")
- body = urllib_parse.urlencode(dict())
- response = self.fetch("/get_argument?source=body&foo=hello",
- method="POST", body=body)
- self.assertEqual(response.body, b"default")
- def test_no_gzip(self):
- response = self.fetch('/get_argument')
- self.assertNotIn('Accept-Encoding', response.headers.get('Vary', ''))
- self.assertNotIn('gzip', response.headers.get('Content-Encoding', ''))
- class NonWSGIWebTests(WebTestCase):
- def get_handlers(self):
- return [("/flow_control", FlowControlHandler),
- ("/empty_flush", EmptyFlushCallbackHandler),
- ]
- def test_flow_control(self):
- self.assertEqual(self.fetch("/flow_control").body, b"123")
- def test_empty_flush(self):
- response = self.fetch("/empty_flush")
- self.assertEqual(response.body, b"ok")
- @wsgi_safe
- class ErrorResponseTest(WebTestCase):
- def get_handlers(self):
- class DefaultHandler(RequestHandler):
- def get(self):
- if self.get_argument("status", None):
- raise HTTPError(int(self.get_argument("status")))
- 1 / 0
- class WriteErrorHandler(RequestHandler):
- def get(self):
- if self.get_argument("status", None):
- self.send_error(int(self.get_argument("status")))
- else:
- 1 / 0
- def write_error(self, status_code, **kwargs):
- self.set_header("Content-Type", "text/plain")
- if "exc_info" in kwargs:
- self.write("Exception: %s" % kwargs["exc_info"][0].__name__)
- else:
- self.write("Status: %d" % status_code)
- class FailedWriteErrorHandler(RequestHandler):
- def get(self):
- 1 / 0
- def write_error(self, status_code, **kwargs):
- raise Exception("exception in write_error")
- return [url("/default", DefaultHandler),
- url("/write_error", WriteErrorHandler),
- url("/failed_write_error", FailedWriteErrorHandler),
- ]
- def test_default(self):
- with ExpectLog(app_log, "Uncaught exception"):
- response = self.fetch("/default")
- self.assertEqual(response.code, 500)
- self.assertTrue(b"500: Internal Server Error" in response.body)
- response = self.fetch("/default?status=503")
- self.assertEqual(response.code, 503)
- self.assertTrue(b"503: Service Unavailable" in response.body)
- response = self.fetch("/default?status=435")
- self.assertEqual(response.code, 435)
- self.assertTrue(b"435: Unknown" in response.body)
- def test_write_error(self):
- with ExpectLog(app_log, "Uncaught exception"):
- response = self.fetch("/write_error")
- self.assertEqual(response.code, 500)
- self.assertEqual(b"Exception: ZeroDivisionError", response.body)
- response = self.fetch("/write_error?status=503")
- self.assertEqual(response.code, 503)
- self.assertEqual(b"Status: 503", response.body)
- def test_failed_write_error(self):
- with ExpectLog(app_log, "Uncaught exception"):
- response = self.fetch("/failed_write_error")
- self.assertEqual(response.code, 500)
- self.assertEqual(b"", response.body)
- @wsgi_safe
- class StaticFileTest(WebTestCase):
- # The expected MD5 hash of robots.txt, used in tests that call
- # StaticFileHandler.get_version
- robots_txt_hash = b"f71d20196d4caf35b6a670db8c70b03d"
- static_dir = os.path.join(os.path.dirname(__file__), 'static')
- def get_handlers(self):
- class StaticUrlHandler(RequestHandler):
- def get(self, path):
- with_v = int(self.get_argument('include_version', 1))
- self.write(self.static_url(path, include_version=with_v))
- class AbsoluteStaticUrlHandler(StaticUrlHandler):
- include_host = True
- class OverrideStaticUrlHandler(RequestHandler):
- def get(self, path):
- do_include = bool(self.get_argument("include_host"))
- self.include_host = not do_include
- regular_url = self.static_url(path)
- override_url = self.static_url(path, include_host=do_include)
- if override_url == regular_url:
- return self.write(str(False))
- protocol = self.request.protocol + "://"
- protocol_length = len(protocol)
- check_regular = regular_url.find(protocol, 0, protocol_length)
- check_override = override_url.find(protocol, 0, protocol_length)
- if do_include:
- result = (check_override == 0 and check_regular == -1)
- else:
- result = (check_override == -1 and check_regular == 0)
- self.write(str(result))
- return [('/static_url/(.*)', StaticUrlHandler),
- ('/abs_static_url/(.*)', AbsoluteStaticUrlHandler),
- ('/override_static_url/(.*)', OverrideStaticUrlHandler),
- ('/root_static/(.*)', StaticFileHandler, dict(path='/'))]
- def get_app_kwargs(self):
- return dict(static_path=relpath('static'))
- def test_static_files(self):
- response = self.fetch('/robots.txt')
- self.assertTrue(b"Disallow: /" in response.body)
- response = self.fetch('/static/robots.txt')
- self.assertTrue(b"Disallow: /" in response.body)
- self.assertEqual(response.headers.get("Content-Type"), "text/plain")
- def test_static_compressed_files(self):
- response = self.fetch("/static/sample.xml.gz")
- self.assertEqual(response.headers.get("Content-Type"),
- "application/gzip")
- response = self.fetch("/static/sample.xml.bz2")
- self.assertEqual(response.headers.get("Content-Type"),
- "application/octet-stream")
- # make sure the uncompressed file still has the correct type
- response = self.fetch("/static/sample.xml")
- self.assertTrue(response.headers.get("Content-Type")
- in set(("text/xml", "application/xml")))
- def test_static_url(self):
- response = self.fetch("/static_url/robots.txt")
- self.assertEqual(response.body,
- b"/static/robots.txt?v=" + self.robots_txt_hash)
- def test_absolute_static_url(self):
- response = self.fetch("/abs_static_url/robots.txt")
- self.assertEqual(response.body, (
- utf8(self.get_url("/")) +
- b"static/robots.txt?v=" +
- self.robots_txt_hash
- ))
- def test_relative_version_exclusion(self):
- response = self.fetch("/static_url/robots.txt?include_version=0")
- self.assertEqual(response.body, b"/static/robots.txt")
- def test_absolute_version_exclusion(self):
- response = self.fetch("/abs_static_url/robots.txt?include_version=0")
- self.assertEqual(response.body,
- utf8(self.get_url("/") + "static/robots.txt"))
- def test_include_host_override(self):
- self._trigger_include_host_check(False)
- self._trigger_include_host_check(True)
- def _trigger_include_host_check(self, include_host):
- path = "/override_static_url/robots.txt?include_host=%s"
- response = self.fetch(path % int(include_host))
- self.assertEqual(response.body, utf8(str(True)))
- def get_and_head(self, *args, **kwargs):
- """Performs a GET and HEAD request and returns the GET response.
- Fails if any ``Content-*`` headers returned by the two requests
- differ.
- """
- head_response = self.fetch(*args, method="HEAD", **kwargs)
- get_response = self.fetch(*args, method="GET", **kwargs)
- content_headers = set()
- for h in itertools.chain(head_response.headers, get_response.headers):
- if h.startswith('Content-'):
- content_headers.add(h)
- for h in content_headers:
- self.assertEqual(head_response.headers.get(h),
- get_response.headers.get(h),
- "%s differs between GET (%s) and HEAD (%s)" %
- (h, head_response.headers.get(h),
- get_response.headers.get(h)))
- return get_response
- def test_static_304_if_modified_since(self):
- response1 = self.get_and_head("/static/robots.txt")
- response2 = self.get_and_head("/static/robots.txt", headers={
- 'If-Modified-Since': response1.headers['Last-Modified']})
- self.assertEqual(response2.code, 304)
- self.assertTrue('Content-Length' not in response2.headers)
- self.assertTrue('Last-Modified' not in response2.headers)
- def test_static_304_if_none_match(self):
- response1 = self.get_and_head("/static/robots.txt")
- response2 = self.get_and_head("/static/robots.txt", headers={
- 'If-None-Match': response1.headers['Etag']})
- self.assertEqual(response2.code, 304)
- def test_static_304_etag_modified_bug(self):
- response1 = self.get_and_head("/static/robots.txt")
- response2 = self.get_and_head("/static/robots.txt", headers={
- 'If-None-Match': '"MISMATCH"',
- 'If-Modified-Since': response1.headers['Last-Modified']})
- self.assertEqual(response2.code, 200)
- def test_static_if_modified_since_pre_epoch(self):
- # On windows, the functions that work with time_t do not accept
- # negative values, and at least one client (processing.js) seems
- # to use if-modified-since 1/1/1960 as a cache-busting technique.
- response = self.get_and_head("/static/robots.txt", headers={
- 'If-Modified-Since': 'Fri, 01 Jan 1960 00:00:00 GMT'})
- self.assertEqual(response.code, 200)
- def test_static_if_modified_since_time_zone(self):
- # Instead of the value from Last-Modified, make requests with times
- # chosen just before and after the known modification time
- # of the file to ensure that the right time zone is being used
- # when parsing If-Modified-Since.
- stat = os.stat(relpath('static/robots.txt'))
- response = self.get_and_head('/static/robots.txt', headers={
- 'If-Modified-Since': format_timestamp(stat.st_mtime - 1)})
- self.assertEqual(response.code, 200)
- response = self.get_and_head('/static/robots.txt', headers={
- 'If-Modified-Since': format_timestamp(stat.st_mtime + 1)})
- self.assertEqual(response.code, 304)
- def test_static_etag(self):
- response = self.get_and_head('/static/robots.txt')
- self.assertEqual(utf8(response.headers.get("Etag")),
- b'"' + self.robots_txt_hash + b'"')
- def test_static_with_range(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=0-9'})
- self.assertEqual(response.code, 206)
- self.assertEqual(response.body, b"User-agent")
- self.assertEqual(utf8(response.headers.get("Etag")),
- b'"' + self.robots_txt_hash + b'"')
- self.assertEqual(response.headers.get("Content-Length"), "10")
- self.assertEqual(response.headers.get("Content-Range"),
- "bytes 0-9/26")
- def test_static_with_range_full_file(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=0-'})
- # Note: Chrome refuses to play audio if it gets an HTTP 206 in response
- # to ``Range: bytes=0-`` :(
- self.assertEqual(response.code, 200)
- robots_file_path = os.path.join(self.static_dir, "robots.txt")
- with open(robots_file_path) as f:
- self.assertEqual(response.body, utf8(f.read()))
- self.assertEqual(response.headers.get("Content-Length"), "26")
- self.assertEqual(response.headers.get("Content-Range"), None)
- def test_static_with_range_full_past_end(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=0-10000000'})
- self.assertEqual(response.code, 200)
- robots_file_path = os.path.join(self.static_dir, "robots.txt")
- with open(robots_file_path) as f:
- self.assertEqual(response.body, utf8(f.read()))
- self.assertEqual(response.headers.get("Content-Length"), "26")
- self.assertEqual(response.headers.get("Content-Range"), None)
- def test_static_with_range_partial_past_end(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=1-10000000'})
- self.assertEqual(response.code, 206)
- robots_file_path = os.path.join(self.static_dir, "robots.txt")
- with open(robots_file_path) as f:
- self.assertEqual(response.body, utf8(f.read()[1:]))
- self.assertEqual(response.headers.get("Content-Length"), "25")
- self.assertEqual(response.headers.get("Content-Range"), "bytes 1-25/26")
- def test_static_with_range_end_edge(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=22-'})
- self.assertEqual(response.body, b": /\n")
- self.assertEqual(response.headers.get("Content-Length"), "4")
- self.assertEqual(response.headers.get("Content-Range"),
- "bytes 22-25/26")
- def test_static_with_range_neg_end(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=-4'})
- self.assertEqual(response.body, b": /\n")
- self.assertEqual(response.headers.get("Content-Length"), "4")
- self.assertEqual(response.headers.get("Content-Range"),
- "bytes 22-25/26")
- def test_static_invalid_range(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'asdf'})
- self.assertEqual(response.code, 200)
- def test_static_unsatisfiable_range_zero_suffix(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=-0'})
- self.assertEqual(response.headers.get("Content-Range"),
- "bytes */26")
- self.assertEqual(response.code, 416)
- def test_static_unsatisfiable_range_invalid_start(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=26'})
- self.assertEqual(response.code, 416)
- self.assertEqual(response.headers.get("Content-Range"),
- "bytes */26")
- def test_static_head(self):
- response = self.fetch('/static/robots.txt', method='HEAD')
- self.assertEqual(response.code, 200)
- # No body was returned, but we did get the right content length.
- self.assertEqual(response.body, b'')
- self.assertEqual(response.headers['Content-Length'], '26')
- self.assertEqual(utf8(response.headers['Etag']),
- b'"' + self.robots_txt_hash + b'"')
- def test_static_head_range(self):
- response = self.fetch('/static/robots.txt', method='HEAD',
- headers={'Range': 'bytes=1-4'})
- self.assertEqual(response.code, 206)
- self.assertEqual(response.body, b'')
- self.assertEqual(response.headers['Content-Length'], '4')
- self.assertEqual(utf8(response.headers['Etag']),
- b'"' + self.robots_txt_hash + b'"')
- def test_static_range_if_none_match(self):
- response = self.get_and_head('/static/robots.txt', headers={
- 'Range': 'bytes=1-4',
- 'If-None-Match': b'"' + self.robots_txt_hash + b'"'})
- self.assertEqual(response.code, 304)
- self.assertEqual(response.body, b'')
- self.assertTrue('Content-Length' not in response.headers)
- self.assertEqual(utf8(response.headers['Etag']),
- b'"' + self.robots_txt_hash + b'"')
- def test_static_404(self):
- response = self.get_and_head('/static/blarg')
- self.assertEqual(response.code, 404)
- def test_path_traversal_protection(self):
- # curl_httpclient processes ".." on the client side, so we
- # must test this with simple_httpclient.
- self.http_client.close()
- self.http_client = SimpleAsyncHTTPClient()
- with ExpectLog(gen_log, ".*not in root static directory"):
- response = self.get_and_head('/static/../static_foo.txt')
- # Attempted path traversal should result in 403, not 200
- # (which means the check failed and the file was served)
- # or 404 (which means that the file didn't exist and
- # is probably a packaging error).
- self.assertEqual(response.code, 403)
- @unittest.skipIf(os.name != 'posix', 'non-posix OS')
- def test_root_static_path(self):
- # Sometimes people set the StaticFileHandler's path to '/'
- # to disable Tornado's path validation (in conjunction with
- # their own validation in get_absolute_path). Make sure
- # that the stricter validation in 4.2.1 doesn't break them.
- path = os.path.join(os.path.dirname(os.path.abspath(__file__)),
- 'static/robots.txt')
- response = self.get_and_head('/root_static' + urllib_parse.quote(path))
- self.assertEqual(response.code, 200)
- @wsgi_safe
- class StaticDefaultFilenameTest(WebTestCase):
- def get_app_kwargs(self):
- return dict(static_path=relpath('static'),
- static_handler_args=dict(default_filename='index.html'))
- def get_handlers(self):
- return []
- def test_static_default_filename(self):
- response = self.fetch('/static/dir/', follow_redirects=False)
- self.assertEqual(response.code, 200)
- self.assertEqual(b'this is the index\n', response.body)
- def test_static_default_redirect(self):
- response = self.fetch('/static/dir', follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertTrue(response.headers['Location'].endswith('/static/dir/'))
- @wsgi_safe
- class StaticFileWithPathTest(WebTestCase):
- def get_app_kwargs(self):
- return dict(static_path=relpath('static'),
- static_handler_args=dict(default_filename='index.html'))
- def get_handlers(self):
- return [("/foo/(.*)", StaticFileHandler, {
- "path": relpath("templates/"),
- })]
- def test_serve(self):
- response = self.fetch("/foo/utf8.html")
- self.assertEqual(response.body, b"H\xc3\xa9llo\n")
- @wsgi_safe
- class CustomStaticFileTest(WebTestCase):
- def get_handlers(self):
- class MyStaticFileHandler(StaticFileHandler):
- @classmethod
- def make_static_url(cls, settings, path):
- version_hash = cls.get_version(settings, path)
- extension_index = path.rindex('.')
- before_version = path[:extension_index]
- after_version = path[(extension_index + 1):]
- return '/static/%s.%s.%s' % (before_version, version_hash,
- after_version)
- def parse_url_path(self, url_path):
- extension_index = url_path.rindex('.')
- version_index = url_path.rindex('.', 0, extension_index)
- return '%s%s' % (url_path[:version_index],
- url_path[extension_index:])
- @classmethod
- def get_absolute_path(cls, settings, path):
- return 'CustomStaticFileTest:' + path
- def validate_absolute_path(self, root, absolute_path):
- return absolute_path
- @classmethod
- def get_content(self, path, start=None, end=None):
- assert start is None and end is None
- if path == 'CustomStaticFileTest:foo.txt':
- return b'bar'
- raise Exception("unexpected path %r" % path)
- def get_content_size(self):
- if self.absolute_path == 'CustomStaticFileTest:foo.txt':
- return 3
- raise Exception("unexpected path %r" % self.absolute_path)
- def get_modified_time(self):
- return None
- @classmethod
- def get_version(cls, settings, path):
- return "42"
- class StaticUrlHandler(RequestHandler):
- def get(self, path):
- self.write(self.static_url(path))
- self.static_handler_class = MyStaticFileHandler
- return [("/static_url/(.*)", StaticUrlHandler)]
- def get_app_kwargs(self):
- return dict(static_path="dummy",
- static_handler_class=self.static_handler_class)
- def test_serve(self):
- response = self.fetch("/static/foo.42.txt")
- self.assertEqual(response.body, b"bar")
- def test_static_url(self):
- with ExpectLog(gen_log, "Could not open static file", required=False):
- response = self.fetch("/static_url/foo.txt")
- self.assertEqual(response.body, b"/static/foo.42.txt")
- @wsgi_safe
- class HostMatchingTest(WebTestCase):
- class Handler(RequestHandler):
- def initialize(self, reply):
- self.reply = reply
- def get(self):
- self.write(self.reply)
- def get_handlers(self):
- return [("/foo", HostMatchingTest.Handler, {"reply": "wildcard"})]
- def test_host_matching(self):
- self.app.add_handlers("www.example.com",
- [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
- self.app.add_handlers(r"www\.example\.com",
- [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
- self.app.add_handlers("www.example.com",
- [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
- self.app.add_handlers("www.e.*e.com",
- [("/baz", HostMatchingTest.Handler, {"reply": "[3]"})])
- response = self.fetch("/foo")
- self.assertEqual(response.body, b"wildcard")
- response = self.fetch("/bar")
- self.assertEqual(response.code, 404)
- response = self.fetch("/baz")
- self.assertEqual(response.code, 404)
- response = self.fetch("/foo", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b"[0]")
- response = self.fetch("/bar", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b"[1]")
- response = self.fetch("/baz", headers={'Host': 'www.example.com'})
- self.assertEqual(response.body, b"[2]")
- response = self.fetch("/baz", headers={'Host': 'www.exe.com'})
- self.assertEqual(response.body, b"[3]")
- @wsgi_safe
- class DefaultHostMatchingTest(WebTestCase):
- def get_handlers(self):
- return []
- def get_app_kwargs(self):
- return {'default_host': "www.example.com"}
- def test_default_host_matching(self):
- self.app.add_handlers("www.example.com",
- [("/foo", HostMatchingTest.Handler, {"reply": "[0]"})])
- self.app.add_handlers(r"www\.example\.com",
- [("/bar", HostMatchingTest.Handler, {"reply": "[1]"})])
- self.app.add_handlers("www.test.com",
- [("/baz", HostMatchingTest.Handler, {"reply": "[2]"})])
- response = self.fetch("/foo")
- self.assertEqual(response.body, b"[0]")
- response = self.fetch("/bar")
- self.assertEqual(response.body, b"[1]")
- response = self.fetch("/baz")
- self.assertEqual(response.code, 404)
- response = self.fetch("/foo", headers={"X-Real-Ip": "127.0.0.1"})
- self.assertEqual(response.code, 404)
- self.app.default_host = "www.test.com"
- response = self.fetch("/baz")
- self.assertEqual(response.body, b"[2]")
- @wsgi_safe
- class NamedURLSpecGroupsTest(WebTestCase):
- def get_handlers(self):
- class EchoHandler(RequestHandler):
- def get(self, path):
- self.write(path)
- return [("/str/(?P<path>.*)", EchoHandler),
- (u"/unicode/(?P<path>.*)", EchoHandler)]
- def test_named_urlspec_groups(self):
- response = self.fetch("/str/foo")
- self.assertEqual(response.body, b"foo")
- response = self.fetch("/unicode/bar")
- self.assertEqual(response.body, b"bar")
- @wsgi_safe
- class ClearHeaderTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.set_header("h1", "foo")
- self.set_header("h2", "bar")
- self.clear_header("h1")
- self.clear_header("nonexistent")
- def test_clear_header(self):
- response = self.fetch("/")
- self.assertTrue("h1" not in response.headers)
- self.assertEqual(response.headers["h2"], "bar")
- class Header204Test(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.set_status(204)
- self.finish()
- def test_204_headers(self):
- response = self.fetch('/')
- self.assertEqual(response.code, 204)
- self.assertNotIn("Content-Length", response.headers)
- self.assertNotIn("Transfer-Encoding", response.headers)
- @wsgi_safe
- class Header304Test(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.set_header("Content-Language", "en_US")
- self.write("hello")
- def test_304_headers(self):
- response1 = self.fetch('/')
- self.assertEqual(response1.headers["Content-Length"], "5")
- self.assertEqual(response1.headers["Content-Language"], "en_US")
- response2 = self.fetch('/', headers={
- 'If-None-Match': response1.headers["Etag"]})
- self.assertEqual(response2.code, 304)
- self.assertTrue("Content-Length" not in response2.headers)
- self.assertTrue("Content-Language" not in response2.headers)
- # Not an entity header, but should not be added to 304s by chunking
- self.assertTrue("Transfer-Encoding" not in response2.headers)
- @wsgi_safe
- class StatusReasonTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- reason = self.request.arguments.get('reason', [])
- self.set_status(int(self.get_argument('code')),
- reason=reason[0] if reason else None)
- def get_http_client(self):
- # simple_httpclient only: curl doesn't expose the reason string
- return SimpleAsyncHTTPClient()
- def test_status(self):
- response = self.fetch("/?code=304")
- self.assertEqual(response.code, 304)
- self.assertEqual(response.reason, "Not Modified")
- response = self.fetch("/?code=304&reason=Foo")
- self.assertEqual(response.code, 304)
- self.assertEqual(response.reason, "Foo")
- response = self.fetch("/?code=682&reason=Bar")
- self.assertEqual(response.code, 682)
- self.assertEqual(response.reason, "Bar")
- response = self.fetch("/?code=682")
- self.assertEqual(response.code, 682)
- self.assertEqual(response.reason, "Unknown")
- @wsgi_safe
- class DateHeaderTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.write("hello")
- def test_date_header(self):
- response = self.fetch('/')
- header_date = datetime.datetime(
- *email.utils.parsedate(response.headers['Date'])[:6])
- self.assertTrue(header_date - datetime.datetime.utcnow() <
- datetime.timedelta(seconds=2))
- @wsgi_safe
- class RaiseWithReasonTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- raise HTTPError(682, reason="Foo")
- def get_http_client(self):
- # simple_httpclient only: curl doesn't expose the reason string
- return SimpleAsyncHTTPClient()
- def test_raise_with_reason(self):
- response = self.fetch("/")
- self.assertEqual(response.code, 682)
- self.assertEqual(response.reason, "Foo")
- self.assertIn(b'682: Foo', response.body)
- def test_httperror_str(self):
- self.assertEqual(str(HTTPError(682, reason="Foo")), "HTTP 682: Foo")
- def test_httperror_str_from_httputil(self):
- self.assertEqual(str(HTTPError(682)), "HTTP 682: Unknown")
- @wsgi_safe
- class ErrorHandlerXSRFTest(WebTestCase):
- def get_handlers(self):
- # note that if the handlers list is empty we get the default_host
- # redirect fallback instead of a 404, so test with both an
- # explicitly defined error handler and an implicit 404.
- return [('/error', ErrorHandler, dict(status_code=417))]
- def get_app_kwargs(self):
- return dict(xsrf_cookies=True)
- def test_error_xsrf(self):
- response = self.fetch('/error', method='POST', body='')
- self.assertEqual(response.code, 417)
- def test_404_xsrf(self):
- response = self.fetch('/404', method='POST', body='')
- self.assertEqual(response.code, 404)
- @wsgi_safe
- class GzipTestCase(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- for v in self.get_arguments('vary'):
- self.add_header('Vary', v)
- # Must write at least MIN_LENGTH bytes to activate compression.
- self.write('hello world' + ('!' * GZipContentEncoding.MIN_LENGTH))
- def get_app_kwargs(self):
- return dict(
- gzip=True,
- static_path=os.path.join(os.path.dirname(__file__), 'static'))
- def assert_compressed(self, response):
- # simple_httpclient renames the content-encoding header;
- # curl_httpclient doesn't.
- self.assertEqual(
- response.headers.get(
- 'Content-Encoding',
- response.headers.get('X-Consumed-Content-Encoding')),
- 'gzip')
- def test_gzip(self):
- response = self.fetch('/')
- self.assert_compressed(response)
- self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
- def test_gzip_static(self):
- # The streaming responses in StaticFileHandler have subtle
- # interactions with the gzip output so test this case separately.
- response = self.fetch('/robots.txt')
- self.assert_compressed(response)
- self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
- def test_gzip_not_requested(self):
- response = self.fetch('/', use_gzip=False)
- self.assertNotIn('Content-Encoding', response.headers)
- self.assertEqual(response.headers['Vary'], 'Accept-Encoding')
- def test_vary_already_present(self):
- response = self.fetch('/?vary=Accept-Language')
- self.assert_compressed(response)
- self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
- ['Accept-Language', 'Accept-Encoding'])
- def test_vary_already_present_multiple(self):
- # Regression test for https://github.com/tornadoweb/tornado/issues/1670
- response = self.fetch('/?vary=Accept-Language&vary=Cookie')
- self.assert_compressed(response)
- self.assertEqual([s.strip() for s in response.headers['Vary'].split(',')],
- ['Accept-Language', 'Cookie', 'Accept-Encoding'])
- @wsgi_safe
- class PathArgsInPrepareTest(WebTestCase):
- class Handler(RequestHandler):
- def prepare(self):
- self.write(dict(args=self.path_args, kwargs=self.path_kwargs))
- def get(self, path):
- assert path == 'foo'
- self.finish()
- def get_handlers(self):
- return [('/pos/(.*)', self.Handler),
- ('/kw/(?P<path>.*)', self.Handler)]
- def test_pos(self):
- response = self.fetch('/pos/foo')
- response.rethrow()
- data = json_decode(response.body)
- self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
- def test_kw(self):
- response = self.fetch('/kw/foo')
- response.rethrow()
- data = json_decode(response.body)
- self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
- @wsgi_safe
- class ClearAllCookiesTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.clear_all_cookies()
- self.write('ok')
- def test_clear_all_cookies(self):
- response = self.fetch('/', headers={'Cookie': 'foo=bar; baz=xyzzy'})
- set_cookies = sorted(response.headers.get_list('Set-Cookie'))
- # Python 3.5 sends 'baz="";'; older versions use 'baz=;'
- self.assertTrue(set_cookies[0].startswith('baz=;') or
- set_cookies[0].startswith('baz="";'))
- self.assertTrue(set_cookies[1].startswith('foo=;') or
- set_cookies[1].startswith('foo="";'))
- class PermissionError(Exception):
- pass
- @wsgi_safe
- class ExceptionHandlerTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- exc = self.get_argument('exc')
- if exc == 'http':
- raise HTTPError(410, "no longer here")
- elif exc == 'zero':
- 1 / 0
- elif exc == 'permission':
- raise PermissionError('not allowed')
- def write_error(self, status_code, **kwargs):
- if 'exc_info' in kwargs:
- typ, value, tb = kwargs['exc_info']
- if isinstance(value, PermissionError):
- self.set_status(403)
- self.write('PermissionError')
- return
- RequestHandler.write_error(self, status_code, **kwargs)
- def log_exception(self, typ, value, tb):
- if isinstance(value, PermissionError):
- app_log.warning('custom logging for PermissionError: %s',
- value.args[0])
- else:
- RequestHandler.log_exception(self, typ, value, tb)
- def test_http_error(self):
- # HTTPErrors are logged as warnings with no stack trace.
- # TODO: extend ExpectLog to test this more precisely
- with ExpectLog(gen_log, '.*no longer here'):
- response = self.fetch('/?exc=http')
- self.assertEqual(response.code, 410)
- def test_unknown_error(self):
- # Unknown errors are logged as errors with a stack trace.
- with ExpectLog(app_log, 'Uncaught exception'):
- response = self.fetch('/?exc=zero')
- self.assertEqual(response.code, 500)
- def test_known_error(self):
- # log_exception can override logging behavior, and write_error
- # can override the response.
- with ExpectLog(app_log,
- 'custom logging for PermissionError: not allowed'):
- response = self.fetch('/?exc=permission')
- self.assertEqual(response.code, 403)
- @wsgi_safe
- class BuggyLoggingTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- 1 / 0
- def log_exception(self, typ, value, tb):
- 1 / 0
- def test_buggy_log_exception(self):
- # Something gets logged even though the application's
- # logger is broken.
- with ExpectLog(app_log, '.*'):
- self.fetch('/')
- @wsgi_safe
- class UIMethodUIModuleTest(SimpleHandlerTestCase):
- """Test that UI methods and modules are created correctly and
- associated with the handler.
- """
- class Handler(RequestHandler):
- def get(self):
- self.render('foo.html')
- def value(self):
- return self.get_argument("value")
- def get_app_kwargs(self):
- def my_ui_method(handler, x):
- return "In my_ui_method(%s) with handler value %s." % (
- x, handler.value())
- class MyModule(UIModule):
- def render(self, x):
- return "In MyModule(%s) with handler value %s." % (
- x, self.handler.value())
- loader = DictLoader({
- 'foo.html': '{{ my_ui_method(42) }} {% module MyModule(123) %}',
- })
- return dict(template_loader=loader,
- ui_methods={'my_ui_method': my_ui_method},
- ui_modules={'MyModule': MyModule})
- def tearDown(self):
- super(UIMethodUIModuleTest, self).tearDown()
- # TODO: fix template loader caching so this isn't necessary.
- RequestHandler._template_loaders.clear()
- def test_ui_method(self):
- response = self.fetch('/?value=asdf')
- self.assertEqual(response.body,
- b'In my_ui_method(42) with handler value asdf. '
- b'In MyModule(123) with handler value asdf.')
- @wsgi_safe
- class GetArgumentErrorTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- try:
- self.get_argument('foo')
- self.write({})
- except MissingArgumentError as e:
- self.write({'arg_name': e.arg_name,
- 'log_message': e.log_message})
- def test_catch_error(self):
- response = self.fetch('/')
- self.assertEqual(json_decode(response.body),
- {'arg_name': 'foo',
- 'log_message': 'Missing argument foo'})
- class MultipleExceptionTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- exc_count = 0
- with ignore_deprecation():
- @asynchronous
- def get(self):
- IOLoop.current().add_callback(lambda: 1 / 0)
- IOLoop.current().add_callback(lambda: 1 / 0)
- def log_exception(self, typ, value, tb):
- MultipleExceptionTest.Handler.exc_count += 1
- def test_multi_exception(self):
- with ignore_deprecation():
- # This test verifies that multiple exceptions raised into the same
- # ExceptionStackContext do not generate extraneous log entries
- # due to "Cannot send error response after headers written".
- # log_exception is called, but it does not proceed to send_error.
- response = self.fetch('/')
- self.assertEqual(response.code, 500)
- response = self.fetch('/')
- self.assertEqual(response.code, 500)
- # Each of our two requests generated two exceptions, we should have
- # seen at least three of them by now (the fourth may still be
- # in the queue).
- self.assertGreater(MultipleExceptionTest.Handler.exc_count, 2)
- @wsgi_safe
- class SetLazyPropertiesTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def prepare(self):
- self.current_user = 'Ben'
- self.locale = locale.get('en_US')
- def get_user_locale(self):
- raise NotImplementedError()
- def get_current_user(self):
- raise NotImplementedError()
- def get(self):
- self.write('Hello %s (%s)' % (self.current_user, self.locale.code))
- def test_set_properties(self):
- # Ensure that current_user can be assigned to normally for apps
- # that want to forgo the lazy get_current_user property
- response = self.fetch('/')
- self.assertEqual(response.body, b'Hello Ben (en_US)')
- @wsgi_safe
- class GetCurrentUserTest(WebTestCase):
- def get_app_kwargs(self):
- class WithoutUserModule(UIModule):
- def render(self):
- return ''
- class WithUserModule(UIModule):
- def render(self):
- return str(self.current_user)
- loader = DictLoader({
- 'without_user.html': '',
- 'with_user.html': '{{ current_user }}',
- 'without_user_module.html': '{% module WithoutUserModule() %}',
- 'with_user_module.html': '{% module WithUserModule() %}',
- })
- return dict(template_loader=loader,
- ui_modules={'WithUserModule': WithUserModule,
- 'WithoutUserModule': WithoutUserModule})
- def tearDown(self):
- super(GetCurrentUserTest, self).tearDown()
- RequestHandler._template_loaders.clear()
- def get_handlers(self):
- class CurrentUserHandler(RequestHandler):
- def prepare(self):
- self.has_loaded_current_user = False
- def get_current_user(self):
- self.has_loaded_current_user = True
- return ''
- class WithoutUserHandler(CurrentUserHandler):
- def get(self):
- self.render_string('without_user.html')
- self.finish(str(self.has_loaded_current_user))
- class WithUserHandler(CurrentUserHandler):
- def get(self):
- self.render_string('with_user.html')
- self.finish(str(self.has_loaded_current_user))
- class CurrentUserModuleHandler(CurrentUserHandler):
- def get_template_namespace(self):
- # If RequestHandler.get_template_namespace is called, then
- # get_current_user is evaluated. Until #820 is fixed, this
- # is a small hack to circumvent the issue.
- return self.ui
- class WithoutUserModuleHandler(CurrentUserModuleHandler):
- def get(self):
- self.render_string('without_user_module.html')
- self.finish(str(self.has_loaded_current_user))
- class WithUserModuleHandler(CurrentUserModuleHandler):
- def get(self):
- self.render_string('with_user_module.html')
- self.finish(str(self.has_loaded_current_user))
- return [('/without_user', WithoutUserHandler),
- ('/with_user', WithUserHandler),
- ('/without_user_module', WithoutUserModuleHandler),
- ('/with_user_module', WithUserModuleHandler)]
- @unittest.skip('needs fix')
- def test_get_current_user_is_lazy(self):
- # TODO: Make this test pass. See #820.
- response = self.fetch('/without_user')
- self.assertEqual(response.body, b'False')
- def test_get_current_user_works(self):
- response = self.fetch('/with_user')
- self.assertEqual(response.body, b'True')
- def test_get_current_user_from_ui_module_is_lazy(self):
- response = self.fetch('/without_user_module')
- self.assertEqual(response.body, b'False')
- def test_get_current_user_from_ui_module_works(self):
- response = self.fetch('/with_user_module')
- self.assertEqual(response.body, b'True')
- @wsgi_safe
- class UnimplementedHTTPMethodsTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- pass
- def test_unimplemented_standard_methods(self):
- for method in ['HEAD', 'GET', 'DELETE', 'OPTIONS']:
- response = self.fetch('/', method=method)
- self.assertEqual(response.code, 405)
- for method in ['POST', 'PUT']:
- response = self.fetch('/', method=method, body=b'')
- self.assertEqual(response.code, 405)
- class UnimplementedNonStandardMethodsTest(SimpleHandlerTestCase):
- # wsgiref.validate complains about unknown methods in a way that makes
- # this test not wsgi_safe.
- class Handler(RequestHandler):
- def other(self):
- # Even though this method exists, it won't get called automatically
- # because it is not in SUPPORTED_METHODS.
- self.write('other')
- def test_unimplemented_patch(self):
- # PATCH is recently standardized; Tornado supports it by default
- # but wsgiref.validate doesn't like it.
- response = self.fetch('/', method='PATCH', body=b'')
- self.assertEqual(response.code, 405)
- def test_unimplemented_other(self):
- response = self.fetch('/', method='OTHER',
- allow_nonstandard_methods=True)
- self.assertEqual(response.code, 405)
- @wsgi_safe
- class AllHTTPMethodsTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def method(self):
- self.write(self.request.method)
- get = delete = options = post = put = method
- def test_standard_methods(self):
- response = self.fetch('/', method='HEAD')
- self.assertEqual(response.body, b'')
- for method in ['GET', 'DELETE', 'OPTIONS']:
- response = self.fetch('/', method=method)
- self.assertEqual(response.body, utf8(method))
- for method in ['POST', 'PUT']:
- response = self.fetch('/', method=method, body=b'')
- self.assertEqual(response.body, utf8(method))
- class PatchMethodTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- SUPPORTED_METHODS = RequestHandler.SUPPORTED_METHODS + ('OTHER',)
- def patch(self):
- self.write('patch')
- def other(self):
- self.write('other')
- def test_patch(self):
- response = self.fetch('/', method='PATCH', body=b'')
- self.assertEqual(response.body, b'patch')
- def test_other(self):
- response = self.fetch('/', method='OTHER',
- allow_nonstandard_methods=True)
- self.assertEqual(response.body, b'other')
- @wsgi_safe
- class FinishInPrepareTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def prepare(self):
- self.finish('done')
- def get(self):
- # It's difficult to assert for certain that a method did not
- # or will not be called in an asynchronous context, but this
- # will be logged noisily if it is reached.
- raise Exception('should not reach this method')
- def test_finish_in_prepare(self):
- response = self.fetch('/')
- self.assertEqual(response.body, b'done')
- @wsgi_safe
- class Default404Test(WebTestCase):
- def get_handlers(self):
- # If there are no handlers at all a default redirect handler gets added.
- return [('/foo', RequestHandler)]
- def test_404(self):
- response = self.fetch('/')
- self.assertEqual(response.code, 404)
- self.assertEqual(response.body,
- b'<html><title>404: Not Found</title>'
- b'<body>404: Not Found</body></html>')
- @wsgi_safe
- class Custom404Test(WebTestCase):
- def get_handlers(self):
- return [('/foo', RequestHandler)]
- def get_app_kwargs(self):
- class Custom404Handler(RequestHandler):
- def get(self):
- self.set_status(404)
- self.write('custom 404 response')
- return dict(default_handler_class=Custom404Handler)
- def test_404(self):
- response = self.fetch('/')
- self.assertEqual(response.code, 404)
- self.assertEqual(response.body, b'custom 404 response')
- @wsgi_safe
- class DefaultHandlerArgumentsTest(WebTestCase):
- def get_handlers(self):
- return [('/foo', RequestHandler)]
- def get_app_kwargs(self):
- return dict(default_handler_class=ErrorHandler,
- default_handler_args=dict(status_code=403))
- def test_403(self):
- response = self.fetch('/')
- self.assertEqual(response.code, 403)
- @wsgi_safe
- class HandlerByNameTest(WebTestCase):
- def get_handlers(self):
- # All three are equivalent.
- return [('/hello1', HelloHandler),
- ('/hello2', 'tornado.test.web_test.HelloHandler'),
- url('/hello3', 'tornado.test.web_test.HelloHandler'),
- ]
- def test_handler_by_name(self):
- resp = self.fetch('/hello1')
- self.assertEqual(resp.body, b'hello')
- resp = self.fetch('/hello2')
- self.assertEqual(resp.body, b'hello')
- resp = self.fetch('/hello3')
- self.assertEqual(resp.body, b'hello')
- class StreamingRequestBodyTest(WebTestCase):
- def get_handlers(self):
- @stream_request_body
- class StreamingBodyHandler(RequestHandler):
- def initialize(self, test):
- self.test = test
- def prepare(self):
- self.test.prepared.set_result(None)
- def data_received(self, data):
- self.test.data.set_result(data)
- def get(self):
- self.test.finished.set_result(None)
- self.write({})
- @stream_request_body
- class EarlyReturnHandler(RequestHandler):
- def prepare(self):
- # If we finish the response in prepare, it won't continue to
- # the (non-existent) data_received.
- raise HTTPError(401)
- @stream_request_body
- class CloseDetectionHandler(RequestHandler):
- def initialize(self, test):
- self.test = test
- def on_connection_close(self):
- super(CloseDetectionHandler, self).on_connection_close()
- self.test.close_future.set_result(None)
- return [('/stream_body', StreamingBodyHandler, dict(test=self)),
- ('/early_return', EarlyReturnHandler),
- ('/close_detection', CloseDetectionHandler, dict(test=self))]
- def connect(self, url, connection_close):
- # Use a raw connection so we can control the sending of data.
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
- s.connect(("127.0.0.1", self.get_http_port()))
- stream = IOStream(s)
- stream.write(b"GET " + url + b" HTTP/1.1\r\n")
- if connection_close:
- stream.write(b"Connection: close\r\n")
- stream.write(b"Transfer-Encoding: chunked\r\n\r\n")
- return stream
- @gen_test
- def test_streaming_body(self):
- self.prepared = Future()
- self.data = Future()
- self.finished = Future()
- stream = self.connect(b"/stream_body", connection_close=True)
- yield self.prepared
- stream.write(b"4\r\nasdf\r\n")
- # Ensure the first chunk is received before we send the second.
- data = yield self.data
- self.assertEqual(data, b"asdf")
- self.data = Future()
- stream.write(b"4\r\nqwer\r\n")
- data = yield self.data
- self.assertEquals(data, b"qwer")
- stream.write(b"0\r\n\r\n")
- yield self.finished
- data = yield stream.read_until_close()
- # This would ideally use an HTTP1Connection to read the response.
- self.assertTrue(data.endswith(b"{}"))
- stream.close()
- @gen_test
- def test_early_return(self):
- stream = self.connect(b"/early_return", connection_close=False)
- data = yield stream.read_until_close()
- self.assertTrue(data.startswith(b"HTTP/1.1 401"))
- @gen_test
- def test_early_return_with_data(self):
- stream = self.connect(b"/early_return", connection_close=False)
- stream.write(b"4\r\nasdf\r\n")
- data = yield stream.read_until_close()
- self.assertTrue(data.startswith(b"HTTP/1.1 401"))
- @gen_test
- def test_close_during_upload(self):
- self.close_future = Future()
- stream = self.connect(b"/close_detection", connection_close=False)
- stream.close()
- yield self.close_future
- # Each method in this handler returns a yieldable object and yields to the
- # IOLoop so the future is not immediately ready. Ensure that the
- # yieldables are respected and no method is called before the previous
- # one has completed.
- @stream_request_body
- class BaseFlowControlHandler(RequestHandler):
- def initialize(self, test):
- self.test = test
- self.method = None
- self.methods = []
- @contextlib.contextmanager
- def in_method(self, method):
- if self.method is not None:
- self.test.fail("entered method %s while in %s" %
- (method, self.method))
- self.method = method
- self.methods.append(method)
- try:
- yield
- finally:
- self.method = None
- @gen.coroutine
- def prepare(self):
- # Note that asynchronous prepare() does not block data_received,
- # so we don't use in_method here.
- self.methods.append('prepare')
- yield gen.moment
- @gen.coroutine
- def post(self):
- with self.in_method('post'):
- yield gen.moment
- self.write(dict(methods=self.methods))
- class BaseStreamingRequestFlowControlTest(object):
- def get_httpserver_options(self):
- # Use a small chunk size so flow control is relevant even though
- # all the data arrives at once.
- return dict(chunk_size=10, decompress_request=True)
- def get_http_client(self):
- # simple_httpclient only: curl doesn't support body_producer.
- return SimpleAsyncHTTPClient()
- # Test all the slightly different code paths for fixed, chunked, etc bodies.
- def test_flow_control_fixed_body(self):
- response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
- method='POST')
- response.rethrow()
- self.assertEqual(json_decode(response.body),
- dict(methods=['prepare', 'data_received',
- 'data_received', 'data_received',
- 'post']))
- def test_flow_control_chunked_body(self):
- chunks = [b'abcd', b'efgh', b'ijkl']
- @gen.coroutine
- def body_producer(write):
- for i in chunks:
- yield write(i)
- response = self.fetch('/', body_producer=body_producer, method='POST')
- response.rethrow()
- self.assertEqual(json_decode(response.body),
- dict(methods=['prepare', 'data_received',
- 'data_received', 'data_received',
- 'post']))
- def test_flow_control_compressed_body(self):
- bytesio = BytesIO()
- gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
- gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
- gzip_file.close()
- compressed_body = bytesio.getvalue()
- response = self.fetch('/', body=compressed_body, method='POST',
- headers={'Content-Encoding': 'gzip'})
- response.rethrow()
- self.assertEqual(json_decode(response.body),
- dict(methods=['prepare', 'data_received',
- 'data_received', 'data_received',
- 'post']))
- class DecoratedStreamingRequestFlowControlTest(
- BaseStreamingRequestFlowControlTest,
- WebTestCase):
- def get_handlers(self):
- class DecoratedFlowControlHandler(BaseFlowControlHandler):
- @gen.coroutine
- def data_received(self, data):
- with self.in_method('data_received'):
- yield gen.moment
- return [('/', DecoratedFlowControlHandler, dict(test=self))]
- @skipBefore35
- class NativeStreamingRequestFlowControlTest(
- BaseStreamingRequestFlowControlTest,
- WebTestCase):
- def get_handlers(self):
- class NativeFlowControlHandler(BaseFlowControlHandler):
- data_received = exec_test(globals(), locals(), """
- async def data_received(self, data):
- with self.in_method('data_received'):
- import asyncio
- await asyncio.sleep(0)
- """)["data_received"]
- return [('/', NativeFlowControlHandler, dict(test=self))]
- @wsgi_safe
- class IncorrectContentLengthTest(SimpleHandlerTestCase):
- def get_handlers(self):
- test = self
- self.server_error = None
- # Manually set a content-length that doesn't match the actual content.
- class TooHigh(RequestHandler):
- def get(self):
- self.set_header("Content-Length", "42")
- try:
- self.finish("ok")
- except Exception as e:
- test.server_error = e
- raise
- class TooLow(RequestHandler):
- def get(self):
- self.set_header("Content-Length", "2")
- try:
- self.finish("hello")
- except Exception as e:
- test.server_error = e
- raise
- return [('/high', TooHigh),
- ('/low', TooLow)]
- def test_content_length_too_high(self):
- # When the content-length is too high, the connection is simply
- # closed without completing the response. An error is logged on
- # the server.
- with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
- with ExpectLog(gen_log,
- "(Cannot send error response after headers written"
- "|Failed to flush partial response)"):
- with self.assertRaises(HTTPClientError):
- self.fetch("/high", raise_error=True)
- self.assertEqual(str(self.server_error),
- "Tried to write 40 bytes less than Content-Length")
- def test_content_length_too_low(self):
- # When the content-length is too low, the connection is closed
- # without writing the last chunk, so the client never sees the request
- # complete (which would be a framing error).
- with ExpectLog(app_log, "(Uncaught exception|Exception in callback)"):
- with ExpectLog(gen_log,
- "(Cannot send error response after headers written"
- "|Failed to flush partial response)"):
- with self.assertRaises(HTTPClientError):
- self.fetch("/low", raise_error=True)
- self.assertEqual(str(self.server_error),
- "Tried to write more data than Content-Length")
- class ClientCloseTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- if self.request.version.startswith('HTTP/1'):
- # Simulate a connection closed by the client during
- # request processing. The client will see an error, but the
- # server should respond gracefully (without logging errors
- # because we were unable to write out as many bytes as
- # Content-Length said we would)
- self.request.connection.stream.close()
- self.write('hello')
- else:
- # TODO: add a HTTP2-compatible version of this test.
- self.write('requires HTTP/1.x')
- def test_client_close(self):
- with self.assertRaises((HTTPClientError, unittest.SkipTest)):
- response = self.fetch('/', raise_error=True)
- if response.body == b'requires HTTP/1.x':
- self.skipTest('requires HTTP/1.x')
- self.assertEqual(response.code, 599)
- class SignedValueTest(unittest.TestCase):
- SECRET = "It's a secret to everybody"
- SECRET_DICT = {0: "asdfbasdf", 1: "12312312", 2: "2342342"}
- def past(self):
- return self.present() - 86400 * 32
- def present(self):
- return 1300000000
- def test_known_values(self):
- signed_v1 = create_signed_value(SignedValueTest.SECRET, "key", "value",
- version=1, clock=self.present)
- self.assertEqual(
- signed_v1,
- b"dmFsdWU=|1300000000|31c934969f53e48164c50768b40cbd7e2daaaa4f")
- signed_v2 = create_signed_value(SignedValueTest.SECRET, "key", "value",
- version=2, clock=self.present)
- self.assertEqual(
- signed_v2,
- b"2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
- b"3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152")
- signed_default = create_signed_value(SignedValueTest.SECRET,
- "key", "value", clock=self.present)
- self.assertEqual(signed_default, signed_v2)
- decoded_v1 = decode_signed_value(SignedValueTest.SECRET, "key",
- signed_v1, min_version=1,
- clock=self.present)
- self.assertEqual(decoded_v1, b"value")
- decoded_v2 = decode_signed_value(SignedValueTest.SECRET, "key",
- signed_v2, min_version=2,
- clock=self.present)
- self.assertEqual(decoded_v2, b"value")
- def test_name_swap(self):
- signed1 = create_signed_value(SignedValueTest.SECRET, "key1", "value",
- clock=self.present)
- signed2 = create_signed_value(SignedValueTest.SECRET, "key2", "value",
- clock=self.present)
- # Try decoding each string with the other's "name"
- decoded1 = decode_signed_value(SignedValueTest.SECRET, "key2", signed1,
- clock=self.present)
- self.assertIs(decoded1, None)
- decoded2 = decode_signed_value(SignedValueTest.SECRET, "key1", signed2,
- clock=self.present)
- self.assertIs(decoded2, None)
- def test_expired(self):
- signed = create_signed_value(SignedValueTest.SECRET, "key1", "value",
- clock=self.past)
- decoded_past = decode_signed_value(SignedValueTest.SECRET, "key1",
- signed, clock=self.past)
- self.assertEqual(decoded_past, b"value")
- decoded_present = decode_signed_value(SignedValueTest.SECRET, "key1",
- signed, clock=self.present)
- self.assertIs(decoded_present, None)
- def test_payload_tampering(self):
- # These cookies are variants of the one in test_known_values.
- sig = "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"
- def validate(prefix):
- return (b'value' ==
- decode_signed_value(SignedValueTest.SECRET, "key",
- prefix + sig, clock=self.present))
- self.assertTrue(validate("2|1:0|10:1300000000|3:key|8:dmFsdWU=|"))
- # Change key version
- self.assertFalse(validate("2|1:1|10:1300000000|3:key|8:dmFsdWU=|"))
- # length mismatch (field too short)
- self.assertFalse(validate("2|1:0|10:130000000|3:key|8:dmFsdWU=|"))
- # length mismatch (field too long)
- self.assertFalse(validate("2|1:0|10:1300000000|3:keey|8:dmFsdWU=|"))
- def test_signature_tampering(self):
- prefix = "2|1:0|10:1300000000|3:key|8:dmFsdWU=|"
- def validate(sig):
- return (b'value' ==
- decode_signed_value(SignedValueTest.SECRET, "key",
- prefix + sig, clock=self.present))
- self.assertTrue(validate(
- "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
- # All zeros
- self.assertFalse(validate("0" * 32))
- # Change one character
- self.assertFalse(validate(
- "4d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e152"))
- # Change another character
- self.assertFalse(validate(
- "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e153"))
- # Truncate
- self.assertFalse(validate(
- "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e15"))
- # Lengthen
- self.assertFalse(validate(
- "3d4e60b996ff9c5d5788e333a0cba6f238a22c6c0f94788870e1a9ecd482e1538"))
- def test_non_ascii(self):
- value = b"\xe9"
- signed = create_signed_value(SignedValueTest.SECRET, "key", value,
- clock=self.present)
- decoded = decode_signed_value(SignedValueTest.SECRET, "key", signed,
- clock=self.present)
- self.assertEqual(value, decoded)
- def test_key_versioning_read_write_default_key(self):
- value = b"\xe9"
- signed = create_signed_value(SignedValueTest.SECRET_DICT,
- "key", value, clock=self.present,
- key_version=0)
- decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
- "key", signed, clock=self.present)
- self.assertEqual(value, decoded)
- def test_key_versioning_read_write_non_default_key(self):
- value = b"\xe9"
- signed = create_signed_value(SignedValueTest.SECRET_DICT,
- "key", value, clock=self.present,
- key_version=1)
- decoded = decode_signed_value(SignedValueTest.SECRET_DICT,
- "key", signed, clock=self.present)
- self.assertEqual(value, decoded)
- def test_key_versioning_invalid_key(self):
- value = b"\xe9"
- signed = create_signed_value(SignedValueTest.SECRET_DICT,
- "key", value, clock=self.present,
- key_version=0)
- newkeys = SignedValueTest.SECRET_DICT.copy()
- newkeys.pop(0)
- decoded = decode_signed_value(newkeys,
- "key", signed, clock=self.present)
- self.assertEqual(None, decoded)
- def test_key_version_retrieval(self):
- value = b"\xe9"
- signed = create_signed_value(SignedValueTest.SECRET_DICT,
- "key", value, clock=self.present,
- key_version=1)
- key_version = get_signature_key_version(signed)
- self.assertEqual(1, key_version)
- @wsgi_safe
- class XSRFTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- version = int(self.get_argument("version", "2"))
- # This would be a bad idea in a real app, but in this test
- # it's fine.
- self.settings["xsrf_cookie_version"] = version
- self.write(self.xsrf_token)
- def post(self):
- self.write("ok")
- def get_app_kwargs(self):
- return dict(xsrf_cookies=True)
- def setUp(self):
- super(XSRFTest, self).setUp()
- self.xsrf_token = self.get_token()
- def get_token(self, old_token=None, version=None):
- if old_token is not None:
- headers = self.cookie_headers(old_token)
- else:
- headers = None
- response = self.fetch(
- "/" if version is None else ("/?version=%d" % version),
- headers=headers)
- response.rethrow()
- return native_str(response.body)
- def cookie_headers(self, token=None):
- if token is None:
- token = self.xsrf_token
- return {"Cookie": "_xsrf=" + token}
- def test_xsrf_fail_no_token(self):
- with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
- response = self.fetch("/", method="POST", body=b"")
- self.assertEqual(response.code, 403)
- def test_xsrf_fail_body_no_cookie(self):
- with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
- self.assertEqual(response.code, 403)
- def test_xsrf_fail_argument_invalid_format(self):
- with ExpectLog(gen_log, ".*'_xsrf' argument has invalid format"):
- response = self.fetch(
- "/", method="POST",
- headers=self.cookie_headers(),
- body=urllib_parse.urlencode(dict(_xsrf='3|')))
- self.assertEqual(response.code, 403)
- def test_xsrf_fail_cookie_invalid_format(self):
- with ExpectLog(gen_log, ".*XSRF cookie does not match POST"):
- response = self.fetch(
- "/", method="POST",
- headers=self.cookie_headers(token='3|'),
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)))
- self.assertEqual(response.code, 403)
- def test_xsrf_fail_cookie_no_body(self):
- with ExpectLog(gen_log, ".*'_xsrf' argument missing"):
- response = self.fetch(
- "/", method="POST", body=b"",
- headers=self.cookie_headers())
- self.assertEqual(response.code, 403)
- def test_xsrf_success_short_token(self):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf='deadbeef')),
- headers=self.cookie_headers(token='deadbeef'))
- self.assertEqual(response.code, 200)
- def test_xsrf_success_non_hex_token(self):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf='xoxo')),
- headers=self.cookie_headers(token='xoxo'))
- self.assertEqual(response.code, 200)
- def test_xsrf_success_post_body(self):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
- headers=self.cookie_headers())
- self.assertEqual(response.code, 200)
- def test_xsrf_success_query_string(self):
- response = self.fetch(
- "/?" + urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
- method="POST", body=b"",
- headers=self.cookie_headers())
- self.assertEqual(response.code, 200)
- def test_xsrf_success_header(self):
- response = self.fetch("/", method="POST", body=b"",
- headers=dict({"X-Xsrftoken": self.xsrf_token}, # type: ignore
- **self.cookie_headers()))
- self.assertEqual(response.code, 200)
- def test_distinct_tokens(self):
- # Every request gets a distinct token.
- NUM_TOKENS = 10
- tokens = set()
- for i in range(NUM_TOKENS):
- tokens.add(self.get_token())
- self.assertEqual(len(tokens), NUM_TOKENS)
- def test_cross_user(self):
- token2 = self.get_token()
- # Each token can be used to authenticate its own request.
- for token in (self.xsrf_token, token2):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=token)),
- headers=self.cookie_headers(token))
- self.assertEqual(response.code, 200)
- # Sending one in the cookie and the other in the body is not allowed.
- for cookie_token, body_token in ((self.xsrf_token, token2),
- (token2, self.xsrf_token)):
- with ExpectLog(gen_log, '.*XSRF cookie does not match POST'):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=body_token)),
- headers=self.cookie_headers(cookie_token))
- self.assertEqual(response.code, 403)
- def test_refresh_token(self):
- token = self.xsrf_token
- tokens_seen = set([token])
- # A user's token is stable over time. Refreshing the page in one tab
- # might update the cookie while an older tab still has the old cookie
- # in its DOM. Simulate this scenario by passing a constant token
- # in the body and re-querying for the token.
- for i in range(5):
- token = self.get_token(token)
- # Tokens are encoded uniquely each time
- tokens_seen.add(token)
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=self.xsrf_token)),
- headers=self.cookie_headers(token))
- self.assertEqual(response.code, 200)
- self.assertEqual(len(tokens_seen), 6)
- def test_versioning(self):
- # Version 1 still produces distinct tokens per request.
- self.assertNotEqual(self.get_token(version=1),
- self.get_token(version=1))
- # Refreshed v1 tokens are all identical.
- v1_token = self.get_token(version=1)
- for i in range(5):
- self.assertEqual(self.get_token(v1_token, version=1), v1_token)
- # Upgrade to a v2 version of the same token
- v2_token = self.get_token(v1_token)
- self.assertNotEqual(v1_token, v2_token)
- # Each v1 token can map to many v2 tokens.
- self.assertNotEqual(v2_token, self.get_token(v1_token))
- # The tokens are cross-compatible.
- for cookie_token, body_token in ((v1_token, v2_token),
- (v2_token, v1_token)):
- response = self.fetch(
- "/", method="POST",
- body=urllib_parse.urlencode(dict(_xsrf=body_token)),
- headers=self.cookie_headers(cookie_token))
- self.assertEqual(response.code, 200)
- @wsgi_safe
- class XSRFCookieKwargsTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.write(self.xsrf_token)
- def get_app_kwargs(self):
- return dict(xsrf_cookies=True,
- xsrf_cookie_kwargs=dict(httponly=True))
- def test_xsrf_httponly(self):
- response = self.fetch("/")
- self.assertIn('httponly;', response.headers['Set-Cookie'].lower())
- @wsgi_safe
- class FinishExceptionTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- self.set_status(401)
- self.set_header('WWW-Authenticate', 'Basic realm="something"')
- if self.get_argument('finish_value', ''):
- raise Finish('authentication required')
- else:
- self.write('authentication required')
- raise Finish()
- def test_finish_exception(self):
- for u in ['/', '/?finish_value=1']:
- response = self.fetch(u)
- self.assertEqual(response.code, 401)
- self.assertEqual('Basic realm="something"',
- response.headers.get('WWW-Authenticate'))
- self.assertEqual(b'authentication required', response.body)
- @wsgi_safe
- class DecoratorTest(WebTestCase):
- def get_handlers(self):
- class RemoveSlashHandler(RequestHandler):
- @removeslash
- def get(self):
- pass
- class AddSlashHandler(RequestHandler):
- @addslash
- def get(self):
- pass
- return [("/removeslash/", RemoveSlashHandler),
- ("/addslash", AddSlashHandler),
- ]
- def test_removeslash(self):
- response = self.fetch("/removeslash/", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], "/removeslash")
- response = self.fetch("/removeslash/?foo=bar", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], "/removeslash?foo=bar")
- def test_addslash(self):
- response = self.fetch("/addslash", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], "/addslash/")
- response = self.fetch("/addslash?foo=bar", follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], "/addslash/?foo=bar")
- @wsgi_safe
- class CacheTest(WebTestCase):
- def get_handlers(self):
- class EtagHandler(RequestHandler):
- def get(self, computed_etag):
- self.write(computed_etag)
- def compute_etag(self):
- return self._write_buffer[0]
- return [
- ('/etag/(.*)', EtagHandler)
- ]
- def test_wildcard_etag(self):
- computed_etag = '"xyzzy"'
- etags = '*'
- self._test_etag(computed_etag, etags, 304)
- def test_strong_etag_match(self):
- computed_etag = '"xyzzy"'
- etags = '"xyzzy"'
- self._test_etag(computed_etag, etags, 304)
- def test_multiple_strong_etag_match(self):
- computed_etag = '"xyzzy1"'
- etags = '"xyzzy1", "xyzzy2"'
- self._test_etag(computed_etag, etags, 304)
- def test_strong_etag_not_match(self):
- computed_etag = '"xyzzy"'
- etags = '"xyzzy1"'
- self._test_etag(computed_etag, etags, 200)
- def test_multiple_strong_etag_not_match(self):
- computed_etag = '"xyzzy"'
- etags = '"xyzzy1", "xyzzy2"'
- self._test_etag(computed_etag, etags, 200)
- def test_weak_etag_match(self):
- computed_etag = '"xyzzy1"'
- etags = 'W/"xyzzy1"'
- self._test_etag(computed_etag, etags, 304)
- def test_multiple_weak_etag_match(self):
- computed_etag = '"xyzzy2"'
- etags = 'W/"xyzzy1", W/"xyzzy2"'
- self._test_etag(computed_etag, etags, 304)
- def test_weak_etag_not_match(self):
- computed_etag = '"xyzzy2"'
- etags = 'W/"xyzzy1"'
- self._test_etag(computed_etag, etags, 200)
- def test_multiple_weak_etag_not_match(self):
- computed_etag = '"xyzzy3"'
- etags = 'W/"xyzzy1", W/"xyzzy2"'
- self._test_etag(computed_etag, etags, 200)
- def _test_etag(self, computed_etag, etags, status_code):
- response = self.fetch(
- '/etag/' + computed_etag,
- headers={'If-None-Match': etags}
- )
- self.assertEqual(response.code, status_code)
- @wsgi_safe
- class RequestSummaryTest(SimpleHandlerTestCase):
- class Handler(RequestHandler):
- def get(self):
- # remote_ip is optional, although it's set by
- # both HTTPServer and WSGIAdapter.
- # Clobber it to make sure it doesn't break logging.
- self.request.remote_ip = None
- self.finish(self._request_summary())
- def test_missing_remote_ip(self):
- resp = self.fetch("/")
- self.assertEqual(resp.body, b"GET / (None)")
- class HTTPErrorTest(unittest.TestCase):
- def test_copy(self):
- e = HTTPError(403, reason="Go away")
- e2 = copy.copy(e)
- self.assertIsNot(e, e2)
- self.assertEqual(e.status_code, e2.status_code)
- self.assertEqual(e.reason, e2.reason)
- class ApplicationTest(AsyncTestCase):
- def test_listen(self):
- app = Application([])
- server = app.listen(0, address='127.0.0.1')
- server.stop()
- class URLSpecReverseTest(unittest.TestCase):
- def test_reverse(self):
- self.assertEqual('/favicon.ico', url(r'/favicon\.ico', None).reverse())
- self.assertEqual('/favicon.ico', url(r'^/favicon\.ico$', None).reverse())
- def test_non_reversible(self):
- # URLSpecs are non-reversible if they include non-constant
- # regex features outside capturing groups. Currently, this is
- # only strictly enforced for backslash-escaped character
- # classes.
- paths = [
- r'^/api/v\d+/foo/(\w+)$',
- ]
- for path in paths:
- # A URLSpec can still be created even if it cannot be reversed.
- url_spec = url(path, None)
- try:
- result = url_spec.reverse()
- self.fail("did not get expected exception when reversing %s. "
- "result: %s" % (path, result))
- except ValueError:
- pass
- def test_reverse_arguments(self):
- self.assertEqual('/api/v1/foo/bar',
- url(r'^/api/v1/foo/(\w+)$', None).reverse('bar'))
- class RedirectHandlerTest(WebTestCase):
- def get_handlers(self):
- return [
- ('/src', WebRedirectHandler, {'url': '/dst'}),
- ('/src2', WebRedirectHandler, {'url': '/dst2?foo=bar'}),
- (r'/(.*?)/(.*?)/(.*)', WebRedirectHandler, {'url': '/{1}/{0}/{2}'})]
- def test_basic_redirect(self):
- response = self.fetch('/src', follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/dst')
- def test_redirect_with_argument(self):
- response = self.fetch('/src?foo=bar', follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/dst?foo=bar')
- def test_redirect_with_appending_argument(self):
- response = self.fetch('/src2?foo2=bar2', follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/dst2?foo=bar&foo2=bar2')
- def test_redirect_pattern(self):
- response = self.fetch('/a/b/c', follow_redirects=False)
- self.assertEqual(response.code, 301)
- self.assertEqual(response.headers['Location'], '/b/a/c')
|