12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991199219931994199519961997199819992000200120022003200420052006200720082009201020112012201320142015201620172018201920202021202220232024202520262027202820292030203120322033203420352036203720382039204020412042204320442045204620472048204920502051205220532054205520562057205820592060206120622063206420652066206720682069207020712072207320742075207620772078207920802081208220832084208520862087208820892090209120922093209420952096 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.web.wsgi}.
- """
- __metaclass__ = type
- from sys import exc_info
- import tempfile
- import traceback
- import warnings
- from zope.interface.verify import verifyObject
- from twisted.python.compat import intToBytes, urlquote, _PY3
- from twisted.python.log import addObserver, removeObserver, err
- from twisted.python.failure import Failure
- from twisted.python.threadable import getThreadID
- from twisted.python.threadpool import ThreadPool
- from twisted.internet.defer import Deferred, gatherResults
- from twisted.internet import reactor
- from twisted.internet.error import ConnectionLost
- from twisted.trial.unittest import TestCase, SkipTest
- from twisted.web import http
- from twisted.web.resource import IResource, Resource
- from twisted.web.server import Request, Site, version
- from twisted.web.wsgi import WSGIResource
- from twisted.web.test.test_web import DummyChannel
- class SynchronousThreadPool:
- """
- A single-threaded implementation of part of the L{ThreadPool} interface.
- This implementation calls functions synchronously rather than running
- them in a thread pool. It is used to make the tests which are not
- directly for thread-related behavior deterministic.
- """
- def callInThread(self, f, *a, **kw):
- """
- Call C{f(*a, **kw)} in this thread rather than scheduling it to be
- called in a thread.
- """
- try:
- f(*a, **kw)
- except:
- # callInThread doesn't let exceptions propagate to the caller.
- # None is always returned and any exception raised gets logged
- # later on.
- err(None, "Callable passed to SynchronousThreadPool.callInThread failed")
- class SynchronousReactorThreads:
- """
- A single-threaded implementation of part of the L{IReactorThreads}
- interface. This implementation assumes that it will only be invoked
- from the reactor thread, so it calls functions synchronously rather than
- trying to schedule them to run in the reactor thread. It is used in
- conjunction with L{SynchronousThreadPool} to make the tests which are
- not directly for thread-related behavior deterministic.
- """
- def callFromThread(self, f, *a, **kw):
- """
- Call C{f(*a, **kw)} in this thread which should also be the reactor
- thread.
- """
- f(*a, **kw)
- class WSGIResourceTests(TestCase):
- def setUp(self):
- """
- Create a L{WSGIResource} with synchronous threading objects and a no-op
- application object. This is useful for testing certain things about
- the resource implementation which are unrelated to WSGI.
- """
- self.resource = WSGIResource(
- SynchronousReactorThreads(), SynchronousThreadPool(),
- lambda environ, startResponse: None)
- def test_interfaces(self):
- """
- L{WSGIResource} implements L{IResource} and stops resource traversal.
- """
- verifyObject(IResource, self.resource)
- self.assertTrue(self.resource.isLeaf)
- def test_unsupported(self):
- """
- A L{WSGIResource} cannot have L{IResource} children. Its
- C{getChildWithDefault} and C{putChild} methods raise L{RuntimeError}.
- """
- self.assertRaises(
- RuntimeError,
- self.resource.getChildWithDefault,
- b"foo", Request(DummyChannel(), False))
- self.assertRaises(
- RuntimeError,
- self.resource.putChild,
- b"foo", Resource())
- class WSGITestsMixin:
- """
- @ivar channelFactory: A no-argument callable which will be invoked to
- create a new HTTP channel to associate with request objects.
- """
- channelFactory = DummyChannel
- def setUp(self):
- self.threadpool = SynchronousThreadPool()
- self.reactor = SynchronousReactorThreads()
- def lowLevelRender(
- self, requestFactory, applicationFactory, channelFactory, method,
- version, resourceSegments, requestSegments, query=None, headers=[],
- body=None, safe=''):
- """
- @param method: A C{str} giving the request method to use.
- @param version: A C{str} like C{'1.1'} giving the request version.
- @param resourceSegments: A C{list} of unencoded path segments which
- specifies the location in the resource hierarchy at which the
- L{WSGIResource} will be placed, eg C{['']} for I{/}, C{['foo',
- 'bar', '']} for I{/foo/bar/}, etc.
- @param requestSegments: A C{list} of unencoded path segments giving the
- request URI.
- @param query: A C{list} of two-tuples of C{str} giving unencoded query
- argument keys and values.
- @param headers: A C{list} of two-tuples of C{str} giving request header
- names and corresponding values.
- @param safe: A C{str} giving the bytes which are to be considered
- I{safe} for inclusion in the request URI and not quoted.
- @return: A L{Deferred} which will be called back with a two-tuple of
- the arguments passed which would be passed to the WSGI application
- object for this configuration and request (ie, the environment and
- start_response callable).
- """
- def _toByteString(string):
- # Twisted's HTTP implementation prefers byte strings. As a
- # convenience for tests, string arguments are encoded to an
- # ISO-8859-1 byte string (if not already) before being passed on.
- if isinstance(string, bytes):
- return string
- else:
- return string.encode('iso-8859-1')
- root = WSGIResource(
- self.reactor, self.threadpool, applicationFactory())
- resourceSegments.reverse()
- for seg in resourceSegments:
- tmp = Resource()
- tmp.putChild(_toByteString(seg), root)
- root = tmp
- channel = channelFactory()
- channel.site = Site(root)
- request = requestFactory(channel, False)
- for k, v in headers:
- request.requestHeaders.addRawHeader(
- _toByteString(k), _toByteString(v))
- request.gotLength(0)
- if body:
- request.content.write(body)
- request.content.seek(0)
- uri = '/' + '/'.join([urlquote(seg, safe) for seg in requestSegments])
- if query is not None:
- uri += '?' + '&'.join(['='.join([urlquote(k, safe), urlquote(v, safe)])
- for (k, v) in query])
- request.requestReceived(
- _toByteString(method), _toByteString(uri),
- b'HTTP/' + _toByteString(version))
- return request
- def render(self, *a, **kw):
- result = Deferred()
- def applicationFactory():
- def application(*args):
- environ, startResponse = args
- result.callback(args)
- startResponse('200 OK', [])
- return iter(())
- return application
- self.lowLevelRender(
- Request, applicationFactory, self.channelFactory, *a, **kw)
- return result
- def requestFactoryFactory(self, requestClass=Request):
- d = Deferred()
- def requestFactory(*a, **kw):
- request = requestClass(*a, **kw)
- # If notifyFinish is called after lowLevelRender returns, it won't
- # do the right thing, because the request will have already
- # finished. One might argue that this is a bug in
- # Request.notifyFinish.
- request.notifyFinish().chainDeferred(d)
- return request
- return d, requestFactory
- def getContentFromResponse(self, response):
- return response.split(b'\r\n\r\n', 1)[1]
- def prepareRequest(self, application=None):
- """
- Prepare a L{Request} which, when a request is received, captures the
- C{environ} and C{start_response} callable passed to a WSGI app.
- @param application: An optional WSGI application callable that accepts
- the familiar C{environ} and C{start_response} args and returns an
- iterable of body content. If not supplied, C{start_response} will
- be called with a "200 OK" status and no headers, and no content
- will be yielded.
- @return: A two-tuple of (C{request}, C{deferred}). The former is a
- Twisted L{Request}. The latter is a L{Deferred} which will be
- called back with a two-tuple of the arguments passed to a WSGI
- application (i.e. the C{environ} and C{start_response} callable),
- or will errback with any error arising within the WSGI app.
- """
- result = Deferred()
- def outerApplication(environ, startResponse):
- try:
- if application is None:
- startResponse('200 OK', [])
- content = iter(()) # No content.
- else:
- content = application(environ, startResponse)
- except:
- result.errback()
- startResponse('500 Error', [])
- return iter(())
- else:
- result.callback((environ, startResponse))
- return content
- resource = WSGIResource(
- self.reactor, self.threadpool, outerApplication)
- root = Resource()
- root.putChild(b"res", resource)
- channel = self.channelFactory()
- channel.site = Site(root)
- class CannedRequest(Request):
- """
- Convenient L{Request} derivative which has canned values for all
- of C{requestReceived}'s arguments.
- """
- def requestReceived(
- self, command=b"GET", path=b"/res", version=b"1.1"):
- return Request.requestReceived(
- self, command=command, path=path, version=version)
- request = CannedRequest(channel, queued=False)
- request.gotLength(0) # Initialize buffer for request body.
- return request, result
- class EnvironTests(WSGITestsMixin, TestCase):
- """
- Tests for the values in the C{environ} C{dict} passed to the application
- object by L{twisted.web.wsgi.WSGIResource}.
- """
- def environKeyEqual(self, key, value):
- def assertEnvironKeyEqual(result):
- environ, startResponse = result
- self.assertEqual(environ[key], value)
- return value
- return assertEnvironKeyEqual
- def test_environIsDict(self):
- """
- L{WSGIResource} calls the application object with an C{environ}
- parameter which is exactly of type C{dict}.
- """
- d = self.render('GET', '1.1', [], [''])
- def cbRendered(result):
- environ, startResponse = result
- self.assertIdentical(type(environ), dict)
- # Environment keys are always native strings.
- for name in environ:
- self.assertIsInstance(name, str)
- d.addCallback(cbRendered)
- return d
- def test_requestMethod(self):
- """
- The C{'REQUEST_METHOD'} key of the C{environ} C{dict} passed to the
- application contains the HTTP method in the request (RFC 3875, section
- 4.1.12).
- """
- get = self.render('GET', '1.1', [], [''])
- get.addCallback(self.environKeyEqual('REQUEST_METHOD', 'GET'))
- # Also make sure a different request method shows up as a different
- # value in the environ dict.
- post = self.render('POST', '1.1', [], [''])
- post.addCallback(self.environKeyEqual('REQUEST_METHOD', 'POST'))
- return gatherResults([get, post])
- def test_requestMethodIsNativeString(self):
- """
- The C{'REQUEST_METHOD'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- for method in b"GET", u"GET":
- request, result = self.prepareRequest()
- request.requestReceived(method)
- result.addCallback(self.environKeyEqual('REQUEST_METHOD', 'GET'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_scriptName(self):
- """
- The C{'SCRIPT_NAME'} key of the C{environ} C{dict} passed to the
- application contains the I{abs_path} (RFC 2396, section 3) to this
- resource (RFC 3875, section 4.1.13).
- """
- root = self.render('GET', '1.1', [], [''])
- root.addCallback(self.environKeyEqual('SCRIPT_NAME', ''))
- emptyChild = self.render('GET', '1.1', [''], [''])
- emptyChild.addCallback(self.environKeyEqual('SCRIPT_NAME', '/'))
- leaf = self.render('GET', '1.1', ['foo'], ['foo'])
- leaf.addCallback(self.environKeyEqual('SCRIPT_NAME', '/foo'))
- container = self.render('GET', '1.1', ['foo', ''], ['foo', ''])
- container.addCallback(self.environKeyEqual('SCRIPT_NAME', '/foo/'))
- internal = self.render('GET', '1.1', ['foo'], ['foo', 'bar'])
- internal.addCallback(self.environKeyEqual('SCRIPT_NAME', '/foo'))
- unencoded = self.render(
- 'GET', '1.1', ['foo', '/', b'bar\xff'], ['foo', '/', b'bar\xff'])
- # The RFC says "(not URL-encoded)", even though that makes
- # interpretation of SCRIPT_NAME ambiguous.
- unencoded.addCallback(
- self.environKeyEqual('SCRIPT_NAME', '/foo///bar\xff'))
- return gatherResults([
- root, emptyChild, leaf, container, internal, unencoded])
- def test_scriptNameIsNativeString(self):
- """
- The C{'SCRIPT_NAME'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- request, result = self.prepareRequest()
- request.requestReceived(path=b"/res")
- result.addCallback(self.environKeyEqual('SCRIPT_NAME', '/res'))
- self.assertIsInstance(self.successResultOf(result), str)
- if _PY3:
- # Native strings are rejected by Request.requestReceived() before
- # t.w.wsgi has any say in the matter.
- request, result = self.prepareRequest()
- self.assertRaises(TypeError, request.requestReceived, path=u"/res")
- else:
- request, result = self.prepareRequest()
- request.requestReceived(path=u"/res")
- result.addCallback(self.environKeyEqual('SCRIPT_NAME', '/res'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_pathInfo(self):
- """
- The C{'PATH_INFO'} key of the C{environ} C{dict} passed to the
- application contains the suffix of the request URI path which is not
- included in the value for the C{'SCRIPT_NAME'} key (RFC 3875, section
- 4.1.5).
- """
- assertKeyEmpty = self.environKeyEqual('PATH_INFO', '')
- root = self.render('GET', '1.1', [], [''])
- root.addCallback(self.environKeyEqual('PATH_INFO', '/'))
- emptyChild = self.render('GET', '1.1', [''], [''])
- emptyChild.addCallback(assertKeyEmpty)
- leaf = self.render('GET', '1.1', ['foo'], ['foo'])
- leaf.addCallback(assertKeyEmpty)
- container = self.render('GET', '1.1', ['foo', ''], ['foo', ''])
- container.addCallback(assertKeyEmpty)
- internalLeaf = self.render('GET', '1.1', ['foo'], ['foo', 'bar'])
- internalLeaf.addCallback(self.environKeyEqual('PATH_INFO', '/bar'))
- internalContainer = self.render('GET', '1.1', ['foo'], ['foo', ''])
- internalContainer.addCallback(self.environKeyEqual('PATH_INFO', '/'))
- unencoded = self.render('GET', '1.1', [], ['foo', '/', b'bar\xff'])
- unencoded.addCallback(
- self.environKeyEqual('PATH_INFO', '/foo///bar\xff'))
- return gatherResults([
- root, leaf, container, internalLeaf,
- internalContainer, unencoded])
- def test_pathInfoIsNativeString(self):
- """
- The C{'PATH_INFO'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- request, result = self.prepareRequest()
- request.requestReceived(path=b"/res/foo/bar")
- result.addCallback(self.environKeyEqual('PATH_INFO', '/foo/bar'))
- self.assertIsInstance(self.successResultOf(result), str)
- if _PY3:
- # Native strings are rejected by Request.requestReceived() before
- # t.w.wsgi has any say in the matter.
- request, result = self.prepareRequest()
- self.assertRaises(
- TypeError, request.requestReceived, path=u"/res/foo/bar")
- else:
- request, result = self.prepareRequest()
- request.requestReceived(path=u"/res/foo/bar")
- result.addCallback(self.environKeyEqual('PATH_INFO', '/foo/bar'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_queryString(self):
- """
- The C{'QUERY_STRING'} key of the C{environ} C{dict} passed to the
- application contains the portion of the request URI after the first
- I{?} (RFC 3875, section 4.1.7).
- """
- missing = self.render('GET', '1.1', [], [''], None)
- missing.addCallback(self.environKeyEqual('QUERY_STRING', ''))
- empty = self.render('GET', '1.1', [], [''], [])
- empty.addCallback(self.environKeyEqual('QUERY_STRING', ''))
- present = self.render('GET', '1.1', [], [''], [('foo', 'bar')])
- present.addCallback(self.environKeyEqual('QUERY_STRING', 'foo=bar'))
- unencoded = self.render('GET', '1.1', [], [''], [('/', '/')])
- unencoded.addCallback(self.environKeyEqual('QUERY_STRING', '%2F=%2F'))
- # "?" is reserved in the <searchpart> portion of a URL. However, it
- # seems to be a common mistake of clients to forget to quote it. So,
- # make sure we handle that invalid case.
- doubleQuestion = self.render(
- 'GET', '1.1', [], [''], [('foo', '?bar')], safe='?')
- doubleQuestion.addCallback(
- self.environKeyEqual('QUERY_STRING', 'foo=?bar'))
- return gatherResults([
- missing, empty, present, unencoded, doubleQuestion])
- def test_queryStringIsNativeString(self):
- """
- The C{'QUERY_STRING'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- request, result = self.prepareRequest()
- request.requestReceived(path=b"/res?foo=bar")
- result.addCallback(self.environKeyEqual('QUERY_STRING', 'foo=bar'))
- self.assertIsInstance(self.successResultOf(result), str)
- if _PY3:
- # Native strings are rejected by Request.requestReceived() before
- # t.w.wsgi has any say in the matter.
- request, result = self.prepareRequest()
- self.assertRaises(
- TypeError, request.requestReceived, path=u"/res?foo=bar")
- else:
- request, result = self.prepareRequest()
- request.requestReceived(path=u"/res?foo=bar")
- result.addCallback(self.environKeyEqual('QUERY_STRING', 'foo=bar'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_contentType(self):
- """
- The C{'CONTENT_TYPE'} key of the C{environ} C{dict} passed to the
- application contains the value of the I{Content-Type} request header
- (RFC 3875, section 4.1.3).
- """
- missing = self.render('GET', '1.1', [], [''])
- missing.addCallback(self.environKeyEqual('CONTENT_TYPE', ''))
- present = self.render(
- 'GET', '1.1', [], [''], None, [('content-type', 'x-foo/bar')])
- present.addCallback(self.environKeyEqual('CONTENT_TYPE', 'x-foo/bar'))
- return gatherResults([missing, present])
- def test_contentTypeIsNativeString(self):
- """
- The C{'CONTENT_TYPE'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- for contentType in b"x-foo/bar", u"x-foo/bar":
- request, result = self.prepareRequest()
- request.requestHeaders.addRawHeader(b"Content-Type", contentType)
- request.requestReceived()
- result.addCallback(self.environKeyEqual('CONTENT_TYPE', 'x-foo/bar'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_contentLength(self):
- """
- The C{'CONTENT_LENGTH'} key of the C{environ} C{dict} passed to the
- application contains the value of the I{Content-Length} request header
- (RFC 3875, section 4.1.2).
- """
- missing = self.render('GET', '1.1', [], [''])
- missing.addCallback(self.environKeyEqual('CONTENT_LENGTH', ''))
- present = self.render(
- 'GET', '1.1', [], [''], None, [('content-length', '1234')])
- present.addCallback(self.environKeyEqual('CONTENT_LENGTH', '1234'))
- return gatherResults([missing, present])
- def test_contentLengthIsNativeString(self):
- """
- The C{'CONTENT_LENGTH'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- for contentLength in b"1234", u"1234":
- request, result = self.prepareRequest()
- request.requestHeaders.addRawHeader(b"Content-Length", contentLength)
- request.requestReceived()
- result.addCallback(self.environKeyEqual('CONTENT_LENGTH', '1234'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_serverName(self):
- """
- The C{'SERVER_NAME'} key of the C{environ} C{dict} passed to the
- application contains the best determination of the server hostname
- possible, using either the value of the I{Host} header in the request
- or the address the server is listening on if that header is not
- present (RFC 3875, section 4.1.14).
- """
- missing = self.render('GET', '1.1', [], [''])
- # 10.0.0.1 value comes from a bit far away -
- # twisted.test.test_web.DummyChannel.transport.getHost().host
- missing.addCallback(self.environKeyEqual('SERVER_NAME', '10.0.0.1'))
- present = self.render(
- 'GET', '1.1', [], [''], None, [('host', 'example.org')])
- present.addCallback(self.environKeyEqual('SERVER_NAME', 'example.org'))
- return gatherResults([missing, present])
- def test_serverNameIsNativeString(self):
- """
- The C{'SERVER_NAME'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- for serverName in b"host.example.com", u"host.example.com":
- request, result = self.prepareRequest()
- # This is kind of a cheat; getRequestHostname() breaks in Python 3
- # when the "Host" request header is set to a native string because
- # it tries to split around b":", so we patch the method.
- request.getRequestHostname = lambda: serverName
- request.requestReceived()
- result.addCallback(self.environKeyEqual('SERVER_NAME', 'host.example.com'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_serverPort(self):
- """
- The C{'SERVER_PORT'} key of the C{environ} C{dict} passed to the
- application contains the port number of the server which received the
- request (RFC 3875, section 4.1.15).
- """
- portNumber = 12354
- def makeChannel():
- channel = DummyChannel()
- channel.transport = DummyChannel.TCP()
- channel.transport.port = portNumber
- return channel
- self.channelFactory = makeChannel
- d = self.render('GET', '1.1', [], [''])
- d.addCallback(self.environKeyEqual('SERVER_PORT', str(portNumber)))
- return d
- def test_serverPortIsNativeString(self):
- """
- The C{'SERVER_PORT'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- request, result = self.prepareRequest()
- request.requestReceived()
- result.addCallback(self.environKeyEqual('SERVER_PORT', '80'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_serverProtocol(self):
- """
- The C{'SERVER_PROTOCOL'} key of the C{environ} C{dict} passed to the
- application contains the HTTP version number received in the request
- (RFC 3875, section 4.1.16).
- """
- old = self.render('GET', '1.0', [], [''])
- old.addCallback(self.environKeyEqual('SERVER_PROTOCOL', 'HTTP/1.0'))
- new = self.render('GET', '1.1', [], [''])
- new.addCallback(self.environKeyEqual('SERVER_PROTOCOL', 'HTTP/1.1'))
- return gatherResults([old, new])
- def test_serverProtocolIsNativeString(self):
- """
- The C{'SERVER_PROTOCOL'} key of the C{environ} C{dict} passed to the
- application is always a native string.
- """
- for serverProtocol in b"1.1", u"1.1":
- request, result = self.prepareRequest()
- # In Python 3, native strings can be rejected by Request.write()
- # which will cause a crash after the bit we're trying to test, so
- # we patch write() out here to do nothing.
- request.write = lambda data: None
- request.requestReceived(version=b"1.1")
- result.addCallback(self.environKeyEqual('SERVER_PROTOCOL', '1.1'))
- self.assertIsInstance(self.successResultOf(result), str)
- def test_remoteAddr(self):
- """
- The C{'REMOTE_ADDR'} key of the C{environ} C{dict} passed to the
- application contains the address of the client making the request.
- """
- d = self.render('GET', '1.1', [], [''])
- d.addCallback(self.environKeyEqual('REMOTE_ADDR', '192.168.1.1'))
- return d
- def test_headers(self):
- """
- HTTP request headers are copied into the C{environ} C{dict} passed to
- the application with a C{HTTP_} prefix added to their names.
- """
- singleValue = self.render(
- 'GET', '1.1', [], [''], None, [('foo', 'bar'), ('baz', 'quux')])
- def cbRendered(result):
- environ, startResponse = result
- self.assertEqual(environ['HTTP_FOO'], 'bar')
- self.assertEqual(environ['HTTP_BAZ'], 'quux')
- singleValue.addCallback(cbRendered)
- multiValue = self.render(
- 'GET', '1.1', [], [''], None, [('foo', 'bar'), ('foo', 'baz')])
- multiValue.addCallback(self.environKeyEqual('HTTP_FOO', 'bar,baz'))
- withHyphen = self.render(
- 'GET', '1.1', [], [''], None, [('foo-bar', 'baz')])
- withHyphen.addCallback(self.environKeyEqual('HTTP_FOO_BAR', 'baz'))
- multiLine = self.render(
- 'GET', '1.1', [], [''], None, [('foo', 'bar\n\tbaz')])
- multiLine.addCallback(self.environKeyEqual('HTTP_FOO', 'bar \tbaz'))
- return gatherResults([singleValue, multiValue, withHyphen, multiLine])
- def test_wsgiVersion(self):
- """
- The C{'wsgi.version'} key of the C{environ} C{dict} passed to the
- application has the value C{(1, 0)} indicating that this is a WSGI 1.0
- container.
- """
- versionDeferred = self.render('GET', '1.1', [], [''])
- versionDeferred.addCallback(self.environKeyEqual('wsgi.version', (1, 0)))
- return versionDeferred
- def test_wsgiRunOnce(self):
- """
- The C{'wsgi.run_once'} key of the C{environ} C{dict} passed to the
- application is set to C{False}.
- """
- once = self.render('GET', '1.1', [], [''])
- once.addCallback(self.environKeyEqual('wsgi.run_once', False))
- return once
- def test_wsgiMultithread(self):
- """
- The C{'wsgi.multithread'} key of the C{environ} C{dict} passed to the
- application is set to C{True}.
- """
- thread = self.render('GET', '1.1', [], [''])
- thread.addCallback(self.environKeyEqual('wsgi.multithread', True))
- return thread
- def test_wsgiMultiprocess(self):
- """
- The C{'wsgi.multiprocess'} key of the C{environ} C{dict} passed to the
- application is set to C{False}.
- """
- process = self.render('GET', '1.1', [], [''])
- process.addCallback(self.environKeyEqual('wsgi.multiprocess', False))
- return process
- def test_wsgiURLScheme(self):
- """
- The C{'wsgi.url_scheme'} key of the C{environ} C{dict} passed to the
- application has the request URL scheme.
- """
- # XXX Does this need to be different if the request is for an absolute
- # URL?
- def channelFactory():
- channel = DummyChannel()
- channel.transport = DummyChannel.SSL()
- return channel
- self.channelFactory = DummyChannel
- httpDeferred = self.render('GET', '1.1', [], [''])
- httpDeferred.addCallback(self.environKeyEqual('wsgi.url_scheme', 'http'))
- self.channelFactory = channelFactory
- httpsDeferred = self.render('GET', '1.1', [], [''])
- httpsDeferred.addCallback(self.environKeyEqual('wsgi.url_scheme', 'https'))
- return gatherResults([httpDeferred, httpsDeferred])
- def test_wsgiErrors(self):
- """
- The C{'wsgi.errors'} key of the C{environ} C{dict} passed to the
- application is a file-like object (as defined in the U{Input and Errors
- Streams<http://www.python.org/dev/peps/pep-0333/#input-and-error-streams>}
- section of PEP 333) which converts bytes written to it into events for
- the logging system.
- """
- events = []
- addObserver(events.append)
- self.addCleanup(removeObserver, events.append)
- errors = self.render('GET', '1.1', [], [''])
- def cbErrors(result):
- environ, startApplication = result
- errors = environ['wsgi.errors']
- errors.write('some message\n')
- errors.writelines(['another\nmessage\n'])
- errors.flush()
- self.assertEqual(events[0]['message'], ('some message\n',))
- self.assertEqual(events[0]['system'], 'wsgi')
- self.assertTrue(events[0]['isError'])
- self.assertEqual(events[1]['message'], ('another\nmessage\n',))
- self.assertEqual(events[1]['system'], 'wsgi')
- self.assertTrue(events[1]['isError'])
- self.assertEqual(len(events), 2)
- errors.addCallback(cbErrors)
- return errors
- def test_wsgiErrorsExpectsOnlyNativeStringsInPython2(self):
- """
- The C{'wsgi.errors'} file-like object from the C{environ} C{dict}
- expects writes of only native strings in Python 2. Some existing WSGI
- applications may write non-native (i.e. C{unicode}) strings so, for
- compatibility, these elicit only a warning in Python 2.
- """
- if _PY3:
- raise SkipTest("Not relevant in Python 3")
- request, result = self.prepareRequest()
- request.requestReceived()
- environ, _ = self.successResultOf(result)
- errors = environ["wsgi.errors"]
- with warnings.catch_warnings(record=True) as caught:
- errors.write(u"fred")
- self.assertEqual(1, len(caught))
- self.assertEqual(UnicodeWarning, caught[0].category)
- self.assertEqual(
- "write() argument should be str, not u'fred' (unicode)",
- str(caught[0].message))
- def test_wsgiErrorsAcceptsOnlyNativeStringsInPython3(self):
- """
- The C{'wsgi.errors'} file-like object from the C{environ} C{dict}
- permits writes of only native strings in Python 3, and raises
- C{TypeError} for writes of non-native strings.
- """
- if not _PY3:
- raise SkipTest("Relevant only in Python 3")
- request, result = self.prepareRequest()
- request.requestReceived()
- environ, _ = self.successResultOf(result)
- errors = environ["wsgi.errors"]
- error = self.assertRaises(TypeError, errors.write, b"fred")
- self.assertEqual(
- "write() argument must be str, not b'fred' (bytes)",
- str(error))
- class InputStreamTestMixin(WSGITestsMixin):
- """
- A mixin for L{TestCase} subclasses which defines a number of tests against
- L{_InputStream}. The subclass is expected to create a file-like object to
- be wrapped by an L{_InputStream} under test.
- """
- def getFileType(self):
- raise NotImplementedError(
- "%s.getFile must be implemented" % (self.__class__.__name__,))
- def _renderAndReturnReaderResult(self, reader, content):
- contentType = self.getFileType()
- class CustomizedRequest(Request):
- def gotLength(self, length):
- # Always allocate a file of the specified type, instead of
- # using the base behavior of selecting one depending on the
- # length.
- self.content = contentType()
- def appFactoryFactory(reader):
- result = Deferred()
- def applicationFactory():
- def application(*args):
- environ, startResponse = args
- result.callback(reader(environ['wsgi.input']))
- startResponse('200 OK', [])
- return iter(())
- return application
- return result, applicationFactory
- d, appFactory = appFactoryFactory(reader)
- self.lowLevelRender(
- CustomizedRequest, appFactory, DummyChannel,
- 'PUT', '1.1', [], [''], None, [],
- content)
- return d
- def test_readAll(self):
- """
- Calling L{_InputStream.read} with no arguments returns the entire input
- stream.
- """
- bytes = b"some bytes are here"
- d = self._renderAndReturnReaderResult(lambda input: input.read(), bytes)
- d.addCallback(self.assertEqual, bytes)
- return d
- def test_readSome(self):
- """
- Calling L{_InputStream.read} with an integer returns that many bytes
- from the input stream, as long as it is less than or equal to the total
- number of bytes available.
- """
- bytes = b"hello, world."
- d = self._renderAndReturnReaderResult(lambda input: input.read(3), bytes)
- d.addCallback(self.assertEqual, b"hel")
- return d
- def test_readMoreThan(self):
- """
- Calling L{_InputStream.read} with an integer that is greater than the
- total number of bytes in the input stream returns all bytes in the
- input stream.
- """
- bytes = b"some bytes are here"
- d = self._renderAndReturnReaderResult(
- lambda input: input.read(len(bytes) + 3), bytes)
- d.addCallback(self.assertEqual, bytes)
- return d
- def test_readTwice(self):
- """
- Calling L{_InputStream.read} a second time returns bytes starting from
- the position after the last byte returned by the previous read.
- """
- bytes = b"some bytes, hello"
- def read(input):
- input.read(3)
- return input.read()
- d = self._renderAndReturnReaderResult(read, bytes)
- d.addCallback(self.assertEqual, bytes[3:])
- return d
- def test_readNone(self):
- """
- Calling L{_InputStream.read} with L{None} as an argument returns all
- bytes in the input stream.
- """
- bytes = b"the entire stream"
- d = self._renderAndReturnReaderResult(
- lambda input: input.read(None), bytes)
- d.addCallback(self.assertEqual, bytes)
- return d
- def test_readNegative(self):
- """
- Calling L{_InputStream.read} with a negative integer as an argument
- returns all bytes in the input stream.
- """
- bytes = b"all of the input"
- d = self._renderAndReturnReaderResult(
- lambda input: input.read(-1), bytes)
- d.addCallback(self.assertEqual, bytes)
- return d
- def test_readline(self):
- """
- Calling L{_InputStream.readline} with no argument returns one line from
- the input stream.
- """
- bytes = b"hello\nworld"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readline(), bytes)
- d.addCallback(self.assertEqual, b"hello\n")
- return d
- def test_readlineSome(self):
- """
- Calling L{_InputStream.readline} with an integer returns at most that
- many bytes, even if it is not enough to make up a complete line.
- COMPATIBILITY NOTE: the size argument is excluded from the WSGI
- specification, but is provided here anyhow, because useful libraries
- such as python stdlib's cgi.py assume their input file-like-object
- supports readline with a size argument. If you use it, be aware your
- application may not be portable to other conformant WSGI servers.
- """
- bytes = b"goodbye\nworld"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readline(3), bytes)
- d.addCallback(self.assertEqual, b"goo")
- return d
- def test_readlineMoreThan(self):
- """
- Calling L{_InputStream.readline} with an integer which is greater than
- the number of bytes in the next line returns only the next line.
- """
- bytes = b"some lines\nof text"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readline(20), bytes)
- d.addCallback(self.assertEqual, b"some lines\n")
- return d
- def test_readlineTwice(self):
- """
- Calling L{_InputStream.readline} a second time returns the line
- following the line returned by the first call.
- """
- bytes = b"first line\nsecond line\nlast line"
- def readline(input):
- input.readline()
- return input.readline()
- d = self._renderAndReturnReaderResult(readline, bytes)
- d.addCallback(self.assertEqual, b"second line\n")
- return d
- def test_readlineNone(self):
- """
- Calling L{_InputStream.readline} with L{None} as an argument returns
- one line from the input stream.
- """
- bytes = b"this is one line\nthis is another line"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readline(None), bytes)
- d.addCallback(self.assertEqual, b"this is one line\n")
- return d
- def test_readlineNegative(self):
- """
- Calling L{_InputStream.readline} with a negative integer as an argument
- returns one line from the input stream.
- """
- bytes = b"input stream line one\nline two"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readline(-1), bytes)
- d.addCallback(self.assertEqual, b"input stream line one\n")
- return d
- def test_readlines(self):
- """
- Calling L{_InputStream.readlines} with no arguments returns a list of
- all lines from the input stream.
- """
- bytes = b"alice\nbob\ncarol"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readlines(), bytes)
- d.addCallback(self.assertEqual, [b"alice\n", b"bob\n", b"carol"])
- return d
- def test_readlinesSome(self):
- """
- Calling L{_InputStream.readlines} with an integer as an argument
- returns a list of lines from the input stream with the argument serving
- as an approximate bound on the total number of bytes to read.
- """
- bytes = b"123\n456\n789\n0"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readlines(5), bytes)
- def cbLines(lines):
- # Make sure we got enough lines to make 5 bytes. Anything beyond
- # that is fine too.
- self.assertEqual(lines[:2], [b"123\n", b"456\n"])
- d.addCallback(cbLines)
- return d
- def test_readlinesMoreThan(self):
- """
- Calling L{_InputStream.readlines} with an integer which is greater than
- the total number of bytes in the input stream returns a list of all
- lines from the input.
- """
- bytes = b"one potato\ntwo potato\nthree potato"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readlines(100), bytes)
- d.addCallback(
- self.assertEqual,
- [b"one potato\n", b"two potato\n", b"three potato"])
- return d
- def test_readlinesAfterRead(self):
- """
- Calling L{_InputStream.readlines} after a call to L{_InputStream.read}
- returns lines starting at the byte after the last byte returned by the
- C{read} call.
- """
- bytes = b"hello\nworld\nfoo"
- def readlines(input):
- input.read(7)
- return input.readlines()
- d = self._renderAndReturnReaderResult(readlines, bytes)
- d.addCallback(self.assertEqual, [b"orld\n", b"foo"])
- return d
- def test_readlinesNone(self):
- """
- Calling L{_InputStream.readlines} with L{None} as an argument returns
- all lines from the input.
- """
- bytes = b"one fish\ntwo fish\n"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readlines(None), bytes)
- d.addCallback(self.assertEqual, [b"one fish\n", b"two fish\n"])
- return d
- def test_readlinesNegative(self):
- """
- Calling L{_InputStream.readlines} with a negative integer as an
- argument returns a list of all lines from the input.
- """
- bytes = b"red fish\nblue fish\n"
- d = self._renderAndReturnReaderResult(
- lambda input: input.readlines(-1), bytes)
- d.addCallback(self.assertEqual, [b"red fish\n", b"blue fish\n"])
- return d
- def test_iterable(self):
- """
- Iterating over L{_InputStream} produces lines from the input stream.
- """
- bytes = b"green eggs\nand ham\n"
- d = self._renderAndReturnReaderResult(lambda input: list(input), bytes)
- d.addCallback(self.assertEqual, [b"green eggs\n", b"and ham\n"])
- return d
- def test_iterableAfterRead(self):
- """
- Iterating over L{_InputStream} after calling L{_InputStream.read}
- produces lines from the input stream starting from the first byte after
- the last byte returned by the C{read} call.
- """
- bytes = b"green eggs\nand ham\n"
- def iterate(input):
- input.read(3)
- return list(input)
- d = self._renderAndReturnReaderResult(iterate, bytes)
- d.addCallback(self.assertEqual, [b"en eggs\n", b"and ham\n"])
- return d
- class InputStreamStringIOTests(InputStreamTestMixin, TestCase):
- """
- Tests for L{_InputStream} when it is wrapped around a
- L{StringIO.StringIO}.
- This is only available in Python 2.
- """
- def getFileType(self):
- try:
- from StringIO import StringIO
- except ImportError:
- raise SkipTest("StringIO.StringIO is not available.")
- else:
- return StringIO
- class InputStreamCStringIOTests(InputStreamTestMixin, TestCase):
- """
- Tests for L{_InputStream} when it is wrapped around a
- L{cStringIO.StringIO}.
- This is only available in Python 2.
- """
- def getFileType(self):
- try:
- from cStringIO import StringIO
- except ImportError:
- raise SkipTest("cStringIO.StringIO is not available.")
- else:
- return StringIO
- class InputStreamBytesIOTests(InputStreamTestMixin, TestCase):
- """
- Tests for L{_InputStream} when it is wrapped around an L{io.BytesIO}.
- """
- def getFileType(self):
- from io import BytesIO
- return BytesIO
- class InputStreamTemporaryFileTests(InputStreamTestMixin, TestCase):
- """
- Tests for L{_InputStream} when it is wrapped around a L{tempfile.TemporaryFile}.
- """
- def getFileType(self):
- return tempfile.TemporaryFile
- class StartResponseTests(WSGITestsMixin, TestCase):
- """
- Tests for the I{start_response} parameter passed to the application object
- by L{WSGIResource}.
- """
- def test_status(self):
- """
- The response status passed to the I{start_response} callable is written
- as the status of the response to the request.
- """
- channel = DummyChannel()
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('107 Strange message', [])
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertTrue(
- channel.transport.written.getvalue().startswith(
- b'HTTP/1.1 107 Strange message'))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_statusMustBeNativeString(self):
- """
- The response status passed to the I{start_response} callable MUST be a
- native string in Python 2 and Python 3.
- """
- status = b"200 OK" if _PY3 else u"200 OK"
- def application(environ, startResponse):
- startResponse(status, [])
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- if _PY3:
- self.assertEqual(
- "status must be str, not b'200 OK' (bytes)", str(error))
- else:
- self.assertEqual(
- "status must be str, not u'200 OK' (unicode)", str(error))
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def _headersTest(self, appHeaders, expectedHeaders):
- """
- Verify that if the response headers given by C{appHeaders} are passed
- to the I{start_response} callable, then the response header lines given
- by C{expectedHeaders} plus I{Server} and I{Date} header lines are
- included in the response.
- """
- # Make the Date header value deterministic
- self.patch(http, 'datetimeToString', lambda: 'Tuesday')
- channel = DummyChannel()
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', appHeaders)
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- response = channel.transport.written.getvalue()
- headers, rest = response.split(b'\r\n\r\n', 1)
- headerLines = headers.split(b'\r\n')[1:]
- headerLines.sort()
- allExpectedHeaders = expectedHeaders + [
- b'Date: Tuesday',
- b'Server: ' + version,
- b'Transfer-Encoding: chunked']
- allExpectedHeaders.sort()
- self.assertEqual(headerLines, allExpectedHeaders)
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_headers(self):
- """
- The headers passed to the I{start_response} callable are included in
- the response as are the required I{Date} and I{Server} headers and the
- necessary connection (hop to hop) header I{Transfer-Encoding}.
- """
- return self._headersTest(
- [('foo', 'bar'), ('baz', 'quux')],
- [b'Baz: quux', b'Foo: bar'])
- def test_headersMustBeSequence(self):
- """
- The headers passed to the I{start_response} callable MUST be a
- sequence.
- """
- headers = [("key", "value")]
- def application(environ, startResponse):
- startResponse("200 OK", iter(headers))
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- self.assertRegex(
- str(error), "headers must be a list, not "
- r"<(list_?|sequence)iterator .+> [(]\1iterator[)]")
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def test_headersShouldBePlainList(self):
- """
- According to PEP-3333, the headers passed to the I{start_response}
- callable MUST be a plain list:
- The response_headers argument ... must be a Python list; i.e.
- type(response_headers) is ListType
- However, for bug-compatibility, any sequence is accepted. In both
- Python 2 and Python 3, only a warning is issued when a sequence other
- than a list is encountered.
- """
- def application(environ, startResponse):
- startResponse("200 OK", (("not", "list"),))
- return iter(())
- request, result = self.prepareRequest(application)
- with warnings.catch_warnings(record=True) as caught:
- request.requestReceived()
- result = self.successResultOf(result)
- self.assertEqual(1, len(caught))
- self.assertEqual(RuntimeWarning, caught[0].category)
- self.assertEqual(
- "headers should be a list, not (('not', 'list'),) (tuple)",
- str(caught[0].message))
- def test_headersMustEachBeSequence(self):
- """
- Each header passed to the I{start_response} callable MUST be a
- sequence.
- """
- header = ("key", "value")
- def application(environ, startResponse):
- startResponse("200 OK", [iter(header)])
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- self.assertRegex(
- str(error), "header must be a [(]str, str[)] tuple, not "
- r"<(tuple_?|sequence)iterator .+> [(]\1iterator[)]")
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def test_headersShouldEachBeTuple(self):
- """
- According to PEP-3333, each header passed to the I{start_response}
- callable should be a tuple:
- The response_headers argument is a list of (header_name,
- header_value) tuples
- However, for bug-compatibility, any 2 element sequence is also
- accepted. In both Python 2 and Python 3, only a warning is issued when
- a sequence other than a tuple is encountered.
- """
- def application(environ, startResponse):
- startResponse("200 OK", [["not", "tuple"]])
- return iter(())
- request, result = self.prepareRequest(application)
- with warnings.catch_warnings(record=True) as caught:
- request.requestReceived()
- result = self.successResultOf(result)
- self.assertEqual(1, len(caught))
- self.assertEqual(RuntimeWarning, caught[0].category)
- self.assertEqual(
- "header should be a (str, str) tuple, not ['not', 'tuple'] (list)",
- str(caught[0].message))
- def test_headersShouldEachHaveKeyAndValue(self):
- """
- Each header passed to the I{start_response} callable MUST hold a key
- and a value, and ONLY a key and a value.
- """
- def application(environ, startResponse):
- startResponse("200 OK", [("too", "many", "cooks")])
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- self.assertEqual(
- "header must be a (str, str) tuple, not "
- "('too', 'many', 'cooks')", str(error))
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def test_headerKeyMustBeNativeString(self):
- """
- Each header key passed to the I{start_response} callable MUST be at
- native string in Python 2 and Python 3.
- """
- key = b"key" if _PY3 else u"key"
- def application(environ, startResponse):
- startResponse("200 OK", [(key, "value")])
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- self.assertEqual(
- "header must be (str, str) tuple, not (%r, 'value')" % (key,),
- str(error))
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def test_headerValueMustBeNativeString(self):
- """
- Each header value passed to the I{start_response} callable MUST be at
- native string in Python 2 and Python 3.
- """
- value = b"value" if _PY3 else u"value"
- def application(environ, startResponse):
- startResponse("200 OK", [("key", value)])
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- self.assertEqual(
- "header must be (str, str) tuple, not ('key', %r)" % (value,),
- str(error))
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- def test_applicationProvidedContentType(self):
- """
- If I{Content-Type} is included in the headers passed to the
- I{start_response} callable, one I{Content-Type} header is included in
- the response.
- """
- return self._headersTest(
- [('content-type', 'monkeys are great')],
- [b'Content-Type: monkeys are great'])
- def test_applicationProvidedServerAndDate(self):
- """
- If either I{Server} or I{Date} is included in the headers passed to the
- I{start_response} callable, they are disregarded.
- """
- return self._headersTest(
- [('server', 'foo'), ('Server', 'foo'),
- ('date', 'bar'), ('dATE', 'bar')],
- [])
- def test_delayedUntilReturn(self):
- """
- Nothing is written in response to a request when the I{start_response}
- callable is invoked. If the iterator returned by the application
- object produces only empty strings, the response is written after the
- last element is produced.
- """
- channel = DummyChannel()
- intermediateValues = []
- def record():
- intermediateValues.append(channel.transport.written.getvalue())
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [('foo', 'bar'), ('baz', 'quux')])
- yield b''
- record()
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertEqual(intermediateValues, [b''])
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_delayedUntilContent(self):
- """
- Nothing is written in response to a request when the I{start_response}
- callable is invoked. Once a non-empty string has been produced by the
- iterator returned by the application object, the response status and
- headers are written.
- """
- channel = DummyChannel()
- intermediateValues = []
- def record():
- intermediateValues.append(channel.transport.written.getvalue())
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [('foo', 'bar')])
- yield b''
- record()
- yield b'foo'
- record()
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertFalse(intermediateValues[0])
- self.assertTrue(intermediateValues[1])
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_content(self):
- """
- Content produced by the iterator returned by the application object is
- written to the request as it is produced.
- """
- channel = DummyChannel()
- intermediateValues = []
- def record():
- intermediateValues.append(channel.transport.written.getvalue())
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [('content-length', '6')])
- yield b'foo'
- record()
- yield b'bar'
- record()
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertEqual(
- self.getContentFromResponse(intermediateValues[0]),
- b'foo')
- self.assertEqual(
- self.getContentFromResponse(intermediateValues[1]),
- b'foobar')
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_multipleStartResponse(self):
- """
- If the I{start_response} callable is invoked multiple times before a
- data for the response body is produced, the values from the last call
- are used.
- """
- channel = DummyChannel()
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('100 Foo', [])
- startResponse('200 Bar', [])
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertTrue(
- channel.transport.written.getvalue().startswith(
- b'HTTP/1.1 200 Bar\r\n'))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_startResponseWithException(self):
- """
- If the I{start_response} callable is invoked with a third positional
- argument before the status and headers have been written to the
- response, the status and headers become the newly supplied values.
- """
- channel = DummyChannel()
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('100 Foo', [], (Exception, Exception("foo"), None))
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertTrue(
- channel.transport.written.getvalue().startswith(
- b'HTTP/1.1 100 Foo\r\n'))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_startResponseWithExceptionTooLate(self):
- """
- If the I{start_response} callable is invoked with a third positional
- argument after the status and headers have been written to the
- response, the supplied I{exc_info} values are re-raised to the
- application.
- """
- channel = DummyChannel()
- class SomeException(Exception):
- pass
- try:
- raise SomeException()
- except:
- excInfo = exc_info()
- reraised = []
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [])
- yield b'foo'
- try:
- startResponse('500 ERR', [], excInfo)
- except:
- reraised.append(exc_info())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertTrue(
- channel.transport.written.getvalue().startswith(
- b'HTTP/1.1 200 OK\r\n'))
- self.assertEqual(reraised[0][0], excInfo[0])
- self.assertEqual(reraised[0][1], excInfo[1])
- # Show that the tracebacks end with the same stack frames.
- tb1 = reraised[0][2].tb_next
- tb2 = excInfo[2]
- self.assertEqual(
- # On Python 2 (str is bytes) we need to move back only one
- # stack frame to skip. On Python 3 we need to move two frames.
- traceback.extract_tb(tb1)[1 if str is bytes else 2],
- traceback.extract_tb(tb2)[0]
- )
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_write(self):
- """
- I{start_response} returns the I{write} callable which can be used to
- write bytes to the response body without buffering.
- """
- channel = DummyChannel()
- intermediateValues = []
- def record():
- intermediateValues.append(channel.transport.written.getvalue())
- def applicationFactory():
- def application(environ, startResponse):
- write = startResponse('100 Foo', [('content-length', '6')])
- write(b'foo')
- record()
- write(b'bar')
- record()
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertEqual(
- self.getContentFromResponse(intermediateValues[0]),
- b'foo')
- self.assertEqual(
- self.getContentFromResponse(intermediateValues[1]),
- b'foobar')
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_writeAcceptsOnlyByteStrings(self):
- """
- The C{write} callable returned from C{start_response} only accepts
- byte strings.
- """
- def application(environ, startResponse):
- write = startResponse("200 OK", [])
- write(u"bogus")
- return iter(())
- request, result = self.prepareRequest(application)
- request.requestReceived()
- def checkMessage(error):
- if _PY3:
- self.assertEqual(
- "Can only write bytes to a transport, not 'bogus'",
- str(error))
- else:
- self.assertEqual(
- "Can only write bytes to a transport, not u'bogus'",
- str(error))
- return self.assertFailure(result, TypeError).addCallback(checkMessage)
- class ApplicationTests(WSGITestsMixin, TestCase):
- """
- Tests for things which are done to the application object and the iterator
- it returns.
- """
- def enableThreads(self):
- self.reactor = reactor
- self.threadpool = ThreadPool()
- self.threadpool.start()
- self.addCleanup(self.threadpool.stop)
- def test_close(self):
- """
- If the application object returns an iterator which also has a I{close}
- method, that method is called after iteration is complete.
- """
- channel = DummyChannel()
- class Result:
- def __init__(self):
- self.open = True
- def __iter__(self):
- for i in range(3):
- if self.open:
- yield intToBytes(i)
- def close(self):
- self.open = False
- result = Result()
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [('content-length', '3')])
- return result
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertEqual(
- self.getContentFromResponse(
- channel.transport.written.getvalue()),
- b'012')
- self.assertFalse(result.open)
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''])
- return d
- def test_applicationCalledInThread(self):
- """
- The application object is invoked and iterated in a thread which is not
- the reactor thread.
- """
- self.enableThreads()
- invoked = []
- def applicationFactory():
- def application(environ, startResponse):
- def result():
- for i in range(3):
- invoked.append(getThreadID())
- yield intToBytes(i)
- invoked.append(getThreadID())
- startResponse('200 OK', [('content-length', '3')])
- return result()
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- self.assertNotIn(getThreadID(), invoked)
- self.assertEqual(len(set(invoked)), 1)
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- DummyChannel, 'GET', '1.1', [], [''])
- return d
- def test_writeCalledFromThread(self):
- """
- The I{write} callable returned by I{start_response} calls the request's
- C{write} method in the reactor thread.
- """
- self.enableThreads()
- invoked = []
- class ThreadVerifier(Request):
- def write(self, bytes):
- invoked.append(getThreadID())
- return Request.write(self, bytes)
- def applicationFactory():
- def application(environ, startResponse):
- write = startResponse('200 OK', [])
- write(b'foo')
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory(ThreadVerifier)
- def cbRendered(ignored):
- self.assertEqual(set(invoked), set([getThreadID()]))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory, DummyChannel,
- 'GET', '1.1', [], [''])
- return d
- def test_iteratedValuesWrittenFromThread(self):
- """
- Strings produced by the iterator returned by the application object are
- written to the request in the reactor thread.
- """
- self.enableThreads()
- invoked = []
- class ThreadVerifier(Request):
- def write(self, bytes):
- invoked.append(getThreadID())
- return Request.write(self, bytes)
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [])
- yield b'foo'
- return application
- d, requestFactory = self.requestFactoryFactory(ThreadVerifier)
- def cbRendered(ignored):
- self.assertEqual(set(invoked), set([getThreadID()]))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory, DummyChannel,
- 'GET', '1.1', [], [''])
- return d
- def test_statusWrittenFromThread(self):
- """
- The response status is set on the request object in the reactor thread.
- """
- self.enableThreads()
- invoked = []
- class ThreadVerifier(Request):
- def setResponseCode(self, code, message):
- invoked.append(getThreadID())
- return Request.setResponseCode(self, code, message)
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [])
- return iter(())
- return application
- d, requestFactory = self.requestFactoryFactory(ThreadVerifier)
- def cbRendered(ignored):
- self.assertEqual(set(invoked), set([getThreadID()]))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory, DummyChannel,
- 'GET', '1.1', [], [''])
- return d
- def test_connectionClosedDuringIteration(self):
- """
- If the request connection is lost while the application object is being
- iterated, iteration is stopped.
- """
- class UnreliableConnection(Request):
- """
- This is a request which pretends its connection is lost immediately
- after the first write is done to it.
- """
- def write(self, bytes):
- self.connectionLost(Failure(ConnectionLost("No more connection")))
- self.badIter = False
- def appIter():
- yield b"foo"
- self.badIter = True
- raise Exception("Should not have gotten here")
- def applicationFactory():
- def application(environ, startResponse):
- startResponse('200 OK', [])
- return appIter()
- return application
- d, requestFactory = self.requestFactoryFactory(UnreliableConnection)
- def cbRendered(ignored):
- self.assertFalse(self.badIter, "Should not have resumed iteration")
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory, DummyChannel,
- 'GET', '1.1', [], [''])
- return self.assertFailure(d, ConnectionLost)
- def _internalServerErrorTest(self, application):
- channel = DummyChannel()
- def applicationFactory():
- return application
- d, requestFactory = self.requestFactoryFactory()
- def cbRendered(ignored):
- errors = self.flushLoggedErrors(RuntimeError)
- self.assertEqual(len(errors), 1)
- self.assertTrue(
- channel.transport.written.getvalue().startswith(
- b'HTTP/1.1 500 Internal Server Error'))
- d.addCallback(cbRendered)
- self.lowLevelRender(
- requestFactory, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- return d
- def test_applicationExceptionBeforeStartResponse(self):
- """
- If the application raises an exception before calling I{start_response}
- then the response status is I{500} and the exception is logged.
- """
- def application(environ, startResponse):
- raise RuntimeError("This application had some error.")
- return self._internalServerErrorTest(application)
- def test_applicationExceptionAfterStartResponse(self):
- """
- If the application calls I{start_response} but then raises an exception
- before any data is written to the response then the response status is
- I{500} and the exception is logged.
- """
- def application(environ, startResponse):
- startResponse('200 OK', [])
- raise RuntimeError("This application had some error.")
- return self._internalServerErrorTest(application)
- def _connectionClosedTest(self, application, responseContent):
- channel = DummyChannel()
- def applicationFactory():
- return application
- d, requestFactory = self.requestFactoryFactory()
- # Capture the request so we can disconnect it later on.
- requests = []
- def requestFactoryWrapper(*a, **kw):
- requests.append(requestFactory(*a, **kw))
- return requests[-1]
- def ebRendered(ignored):
- errors = self.flushLoggedErrors(RuntimeError)
- self.assertEqual(len(errors), 1)
- response = channel.transport.written.getvalue()
- self.assertTrue(response.startswith(b'HTTP/1.1 200 OK'))
- # Chunked transfer-encoding makes this a little messy.
- self.assertIn(responseContent, response)
- d.addErrback(ebRendered)
- self.lowLevelRender(
- requestFactoryWrapper, applicationFactory,
- lambda: channel, 'GET', '1.1', [], [''], None, [])
- # By now the connection should be closed.
- self.assertTrue(channel.transport.disconnected)
- # Give it a little push to go the rest of the way.
- requests[0].connectionLost(Failure(ConnectionLost("All gone")))
- return d
- def test_applicationExceptionAfterWrite(self):
- """
- If the application raises an exception after the response status has
- already been sent then the connection is closed and the exception is
- logged.
- """
- responseContent = (
- b'Some bytes, triggering the server to start sending the response')
- def application(environ, startResponse):
- startResponse('200 OK', [])
- yield responseContent
- raise RuntimeError("This application had some error.")
- return self._connectionClosedTest(application, responseContent)
- def test_applicationCloseException(self):
- """
- If the application returns a closeable iterator and the C{close} method
- raises an exception when called then the connection is still closed and
- the exception is logged.
- """
- responseContent = b'foo'
- class Application(object):
- def __init__(self, environ, startResponse):
- startResponse('200 OK', [])
- def __iter__(self):
- yield responseContent
- def close(self):
- raise RuntimeError("This application had some error.")
- return self._connectionClosedTest(Application, responseContent)
|