| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.web.static}.
- """
- import errno
- import inspect
- import mimetypes
- import os
- import re
- from io import BytesIO as StringIO
- from zope.interface.verify import verifyObject
- from twisted.internet import abstract, interfaces
- from twisted.python.runtime import platform
- from twisted.python.filepath import FilePath
- from twisted.python import log
- from twisted.python.compat import intToBytes, networkString, _PY3
- from twisted.trial.unittest import TestCase
- from twisted.web import static, http, script, resource
- from twisted.web.server import UnsupportedMethod
- from twisted.web.test.requesthelper import DummyRequest
- from twisted.web.test._util import _render
- from twisted.web._responses import FOUND
- class StaticDataTests(TestCase):
- """
- Tests for L{Data}.
- """
- def test_headRequest(self):
- """
- L{Data.render} returns an empty response body for a I{HEAD} request.
- """
- data = static.Data(b"foo", "bar")
- request = DummyRequest([''])
- request.method = b'HEAD'
- d = _render(data, request)
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written), b"")
- d.addCallback(cbRendered)
- return d
- def test_invalidMethod(self):
- """
- L{Data.render} raises L{UnsupportedMethod} in response to a non-I{GET},
- non-I{HEAD} request.
- """
- data = static.Data(b"foo", b"bar")
- request = DummyRequest([b''])
- request.method = b'POST'
- self.assertRaises(UnsupportedMethod, data.render, request)
- class StaticFileTests(TestCase):
- """
- Tests for the basic behavior of L{File}.
- """
- def _render(self, resource, request):
- return _render(resource, request)
- def test_invalidMethod(self):
- """
- L{File.render} raises L{UnsupportedMethod} in response to a non-I{GET},
- non-I{HEAD} request.
- """
- request = DummyRequest([b''])
- request.method = b'POST'
- path = FilePath(self.mktemp())
- path.setContent(b"foo")
- file = static.File(path.path)
- self.assertRaises(UnsupportedMethod, file.render, request)
- def test_notFound(self):
- """
- If a request is made which encounters a L{File} before a final segment
- which does not correspond to any file in the path the L{File} was
- created with, a not found response is sent.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- file = static.File(base.path)
- request = DummyRequest([b'foobar'])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, 404)
- d.addCallback(cbRendered)
- return d
- def test_emptyChild(self):
- """
- The C{''} child of a L{File} which corresponds to a directory in the
- filesystem is a L{DirectoryLister}.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- file = static.File(base.path)
- request = DummyRequest([b''])
- child = resource.getChildForRequest(file, request)
- self.assertIsInstance(child, static.DirectoryLister)
- self.assertEqual(child.path, base.path)
- def test_securityViolationNotFound(self):
- """
- If a request is made which encounters a L{File} before a final segment
- which cannot be looked up in the filesystem due to security
- considerations, a not found response is sent.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- file = static.File(base.path)
- request = DummyRequest([b'..'])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, 404)
- d.addCallback(cbRendered)
- return d
- def test_forbiddenResource(self):
- """
- If the file in the filesystem which would satisfy a request cannot be
- read, L{File.render} sets the HTTP response code to I{FORBIDDEN}.
- """
- base = FilePath(self.mktemp())
- base.setContent(b'')
- # Make sure we can delete the file later.
- self.addCleanup(base.chmod, 0o700)
- # Get rid of our own read permission.
- base.chmod(0)
- file = static.File(base.path)
- request = DummyRequest([b''])
- d = self._render(file, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, 403)
- d.addCallback(cbRendered)
- return d
- if platform.isWindows():
- test_forbiddenResource.skip = "Cannot remove read permission on Windows"
- def test_forbiddenResource_default(self):
- """
- L{File.forbidden} defaults to L{resource.ForbiddenResource}.
- """
- self.assertIsInstance(
- static.File(b'.').forbidden, resource.ForbiddenResource)
- def test_forbiddenResource_customize(self):
- """
- The resource rendered for forbidden requests is stored as a class
- member so that users can customize it.
- """
- base = FilePath(self.mktemp())
- base.setContent(b'')
- markerResponse = b'custom-forbidden-response'
- def failingOpenForReading():
- raise IOError(errno.EACCES, "")
- class CustomForbiddenResource(resource.Resource):
- def render(self, request):
- return markerResponse
- class CustomStaticFile(static.File):
- forbidden = CustomForbiddenResource()
- fileResource = CustomStaticFile(base.path)
- fileResource.openForReading = failingOpenForReading
- request = DummyRequest([b''])
- result = fileResource.render(request)
- self.assertEqual(markerResponse, result)
- def test_indexNames(self):
- """
- If a request is made which encounters a L{File} before a final empty
- segment, a file in the L{File} instance's C{indexNames} list which
- exists in the path the L{File} was created with is served as the
- response to the request.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- base.child("foo.bar").setContent(b"baz")
- file = static.File(base.path)
- file.indexNames = [b'foo.bar']
- request = DummyRequest([b''])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written), b'baz')
- self.assertEqual(
- request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'3')
- d.addCallback(cbRendered)
- return d
- def test_staticFile(self):
- """
- If a request is made which encounters a L{File} before a final segment
- which names a file in the path the L{File} was created with, that file
- is served as the response to the request.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- base.child("foo.bar").setContent(b"baz")
- file = static.File(base.path)
- request = DummyRequest([b'foo.bar'])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written), b'baz')
- self.assertEqual(
- request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'3')
- d.addCallback(cbRendered)
- return d
- def test_staticFileDeletedGetChild(self):
- """
- A L{static.File} created for a directory which does not exist should
- return childNotFound from L{static.File.getChild}.
- """
- staticFile = static.File(self.mktemp())
- request = DummyRequest([b'foo.bar'])
- child = staticFile.getChild("foo.bar", request)
- self.assertEqual(child, staticFile.childNotFound)
- def test_staticFileDeletedRender(self):
- """
- A L{static.File} created for a file which does not exist should render
- its C{childNotFound} page.
- """
- staticFile = static.File(self.mktemp())
- request = DummyRequest([b'foo.bar'])
- request2 = DummyRequest([b'foo.bar'])
- d = self._render(staticFile, request)
- d2 = self._render(staticFile.childNotFound, request2)
- def cbRendered2(ignored):
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written),
- b''.join(request2.written))
- d.addCallback(cbRendered)
- return d
- d2.addCallback(cbRendered2)
- return d2
- def test_getChildChildNotFound_customize(self):
- """
- The resource rendered for child not found requests can be customize
- using a class member.
- """
- base = FilePath(self.mktemp())
- base.setContent(b'')
- markerResponse = b'custom-child-not-found-response'
- class CustomChildNotFoundResource(resource.Resource):
- def render(self, request):
- return markerResponse
- class CustomStaticFile(static.File):
- childNotFound = CustomChildNotFoundResource()
- fileResource = CustomStaticFile(base.path)
- request = DummyRequest([b'no-child.txt'])
- child = fileResource.getChild(b'no-child.txt', request)
- result = child.render(request)
- self.assertEqual(markerResponse, result)
- def test_headRequest(self):
- """
- L{static.File.render} returns an empty response body for I{HEAD}
- requests.
- """
- path = FilePath(self.mktemp())
- path.setContent(b"foo")
- file = static.File(path.path)
- request = DummyRequest([b''])
- request.method = b'HEAD'
- d = _render(file, request)
- def cbRendered(ignored):
- self.assertEqual(b"".join(request.written), b"")
- d.addCallback(cbRendered)
- return d
- def test_processors(self):
- """
- If a request is made which encounters a L{File} before a final segment
- which names a file with an extension which is in the L{File}'s
- C{processors} mapping, the processor associated with that extension is
- used to serve the response to the request.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- base.child("foo.bar").setContent(
- b"from twisted.web.static import Data\n"
- b"resource = Data(b'dynamic world', 'text/plain')\n")
- file = static.File(base.path)
- file.processors = {b'.bar': script.ResourceScript}
- request = DummyRequest([b"foo.bar"])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written), b'dynamic world')
- self.assertEqual(
- request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'13')
- d.addCallback(cbRendered)
- return d
- def test_ignoreExt(self):
- """
- The list of ignored extensions can be set by passing a value to
- L{File.__init__} or by calling L{File.ignoreExt} later.
- """
- file = static.File(b".")
- self.assertEqual(file.ignoredExts, [])
- file.ignoreExt(b".foo")
- file.ignoreExt(b".bar")
- self.assertEqual(file.ignoredExts, [b".foo", b".bar"])
- file = static.File(b".", ignoredExts=(b".bar", b".baz"))
- self.assertEqual(file.ignoredExts, [b".bar", b".baz"])
- def test_ignoredExtensionsIgnored(self):
- """
- A request for the I{base} child of a L{File} succeeds with a resource
- for the I{base<extension>} file in the path the L{File} was created
- with if such a file exists and the L{File} has been configured to
- ignore the I{<extension>} extension.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- base.child('foo.bar').setContent(b'baz')
- base.child('foo.quux').setContent(b'foobar')
- file = static.File(base.path, ignoredExts=(b".bar",))
- request = DummyRequest([b"foo"])
- child = resource.getChildForRequest(file, request)
- d = self._render(child, request)
- def cbRendered(ignored):
- self.assertEqual(b''.join(request.written), b'baz')
- d.addCallback(cbRendered)
- return d
- def test_directoryWithoutTrailingSlashRedirects(self):
- """
- A request for a path which is a directory but does not have a trailing
- slash will be redirected to a URL which does have a slash by L{File}.
- """
- base = FilePath(self.mktemp())
- base.makedirs()
- base.child('folder').makedirs()
- file = static.File(base.path)
- request = DummyRequest([b"folder"])
- request.uri = b"http://dummy/folder#baz?foo=bar"
- child = resource.getChildForRequest(file, request)
- self.successResultOf(self._render(child, request))
- self.assertEqual(request.responseCode, FOUND)
- self.assertEqual(request.responseHeaders.getRawHeaders(b"location"),
- [b"http://dummy/folder/#baz?foo=bar"])
- def _makeFilePathWithStringIO(self):
- """
- Create a L{File} that when opened for reading, returns a L{StringIO}.
- @return: 2-tuple of the opened "file" and the L{File}.
- @rtype: L{tuple}
- """
- fakeFile = StringIO()
- path = FilePath(self.mktemp())
- path.touch()
- file = static.File(path.path)
- # Open our file instead of a real one
- file.open = lambda: fakeFile
- return fakeFile, file
- def test_HEADClosesFile(self):
- """
- A HEAD request opens the file, gets the size, and then closes it after
- the request.
- """
- fakeFile, file = self._makeFilePathWithStringIO()
- request = DummyRequest([''])
- request.method = b'HEAD'
- self.successResultOf(_render(file, request))
- self.assertEqual(b''.join(request.written), b'')
- self.assertTrue(fakeFile.closed)
- def test_cachedRequestClosesFile(self):
- """
- A GET request that is cached closes the file after the request.
- """
- fakeFile, file = self._makeFilePathWithStringIO()
- request = DummyRequest([''])
- request.method = b'GET'
- # This request will always return saying that it is cached
- request.setLastModified = lambda _: http.CACHED
- self.successResultOf(_render(file, request))
- self.assertEqual(b''.join(request.written), b'')
- self.assertTrue(fakeFile.closed)
- class StaticMakeProducerTests(TestCase):
- """
- Tests for L{File.makeProducer}.
- """
- def makeResourceWithContent(self, content, type=None, encoding=None):
- """
- Make a L{static.File} resource that has C{content} for its content.
- @param content: The L{bytes} to use as the contents of the resource.
- @param type: Optional value for the content type of the resource.
- """
- fileName = FilePath(self.mktemp())
- fileName.setContent(content)
- resource = static.File(fileName._asBytesPath())
- resource.encoding = encoding
- resource.type = type
- return resource
- def contentHeaders(self, request):
- """
- Extract the content-* headers from the L{DummyRequest} C{request}.
- This returns the subset of C{request.outgoingHeaders} of headers that
- start with 'content-'.
- """
- contentHeaders = {}
- for k, v in request.responseHeaders.getAllRawHeaders():
- if k.lower().startswith(b'content-'):
- contentHeaders[k.lower()] = v[0]
- return contentHeaders
- def test_noRangeHeaderGivesNoRangeStaticProducer(self):
- """
- makeProducer when no Range header is set returns an instance of
- NoRangeStaticProducer.
- """
- resource = self.makeResourceWithContent(b'')
- request = DummyRequest([])
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- self.assertIsInstance(producer, static.NoRangeStaticProducer)
- def test_noRangeHeaderSets200OK(self):
- """
- makeProducer when no Range header is set sets the responseCode on the
- request to 'OK'.
- """
- resource = self.makeResourceWithContent(b'')
- request = DummyRequest([])
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(http.OK, request.responseCode)
- def test_noRangeHeaderSetsContentHeaders(self):
- """
- makeProducer when no Range header is set sets the Content-* headers
- for the response.
- """
- length = 123
- contentType = "text/plain"
- contentEncoding = 'gzip'
- resource = self.makeResourceWithContent(
- b'a'*length, type=contentType, encoding=contentEncoding)
- request = DummyRequest([])
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- {b'content-type': networkString(contentType), b'content-length': intToBytes(length),
- b'content-encoding': networkString(contentEncoding)},
- self.contentHeaders(request))
- def test_singleRangeGivesSingleRangeStaticProducer(self):
- """
- makeProducer when the Range header requests a single byte range
- returns an instance of SingleRangeStaticProducer.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3')
- resource = self.makeResourceWithContent(b'abcdef')
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- self.assertIsInstance(producer, static.SingleRangeStaticProducer)
- def test_singleRangeSets206PartialContent(self):
- """
- makeProducer when the Range header requests a single, satisfiable byte
- range sets the response code on the request to 'Partial Content'.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3')
- resource = self.makeResourceWithContent(b'abcdef')
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- http.PARTIAL_CONTENT, request.responseCode)
- def test_singleRangeSetsContentHeaders(self):
- """
- makeProducer when the Range header requests a single, satisfiable byte
- range sets the Content-* headers appropriately.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3')
- contentType = "text/plain"
- contentEncoding = 'gzip'
- resource = self.makeResourceWithContent(b'abcdef', type=contentType, encoding=contentEncoding)
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- {b'content-type': networkString(contentType),
- b'content-encoding': networkString(contentEncoding),
- b'content-range': b'bytes 1-3/6', b'content-length': b'3'},
- self.contentHeaders(request))
- def test_singleUnsatisfiableRangeReturnsSingleRangeStaticProducer(self):
- """
- makeProducer still returns an instance of L{SingleRangeStaticProducer}
- when the Range header requests a single unsatisfiable byte range.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=4-10')
- resource = self.makeResourceWithContent(b'abc')
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- self.assertIsInstance(producer, static.SingleRangeStaticProducer)
- def test_singleUnsatisfiableRangeSets416ReqestedRangeNotSatisfiable(self):
- """
- makeProducer sets the response code of the request to of 'Requested
- Range Not Satisfiable' when the Range header requests a single
- unsatisfiable byte range.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=4-10')
- resource = self.makeResourceWithContent(b'abc')
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
- def test_singleUnsatisfiableRangeSetsContentHeaders(self):
- """
- makeProducer when the Range header requests a single, unsatisfiable
- byte range sets the Content-* headers appropriately.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=4-10')
- contentType = "text/plain"
- resource = self.makeResourceWithContent(b'abc', type=contentType)
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- {b'content-type': b'text/plain', b'content-length': b'0',
- b'content-range': b'bytes */3'},
- self.contentHeaders(request))
- def test_singlePartiallyOverlappingRangeSetsContentHeaders(self):
- """
- makeProducer when the Range header requests a single byte range that
- partly overlaps the resource sets the Content-* headers appropriately.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=2-10')
- contentType = "text/plain"
- resource = self.makeResourceWithContent(b'abc', type=contentType)
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- {b'content-type': b'text/plain', b'content-length': b'1',
- b'content-range': b'bytes 2-2/3'},
- self.contentHeaders(request))
- def test_multipleRangeGivesMultipleRangeStaticProducer(self):
- """
- makeProducer when the Range header requests a single byte range
- returns an instance of MultipleRangeStaticProducer.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3,5-6')
- resource = self.makeResourceWithContent(b'abcdef')
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
- def test_multipleRangeSets206PartialContent(self):
- """
- makeProducer when the Range header requests a multiple satisfiable
- byte ranges sets the response code on the request to 'Partial
- Content'.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3,5-6')
- resource = self.makeResourceWithContent(b'abcdef')
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- http.PARTIAL_CONTENT, request.responseCode)
- def test_mutipleRangeSetsContentHeaders(self):
- """
- makeProducer when the Range header requests a single, satisfiable byte
- range sets the Content-* headers appropriately.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3,5-6')
- resource = self.makeResourceWithContent(
- b'abcdefghijkl', encoding='gzip')
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- contentHeaders = self.contentHeaders(request)
- # The only content-* headers set are content-type and content-length.
- self.assertEqual(
- set([b'content-length', b'content-type']),
- set(contentHeaders.keys()))
- # The content-length depends on the boundary used in the response.
- expectedLength = 5
- for boundary, offset, size in producer.rangeInfo:
- expectedLength += len(boundary)
- self.assertEqual(intToBytes(expectedLength), contentHeaders[b'content-length'])
- # Content-type should be set to a value indicating a multipart
- # response and the boundary used to separate the parts.
- self.assertIn(b'content-type', contentHeaders)
- contentType = contentHeaders[b'content-type']
- self.assertNotIdentical(
- None, re.match(
- b'multipart/byteranges; boundary="[^"]*"\Z', contentType))
- # Content-encoding is not set in the response to a multiple range
- # response, which is a bit wussy but works well enough with the way
- # static.File does content-encodings...
- self.assertNotIn(b'content-encoding', contentHeaders)
- def test_multipleUnsatisfiableRangesReturnsMultipleRangeStaticProducer(self):
- """
- makeProducer still returns an instance of L{SingleRangeStaticProducer}
- when the Range header requests multiple ranges, none of which are
- satisfiable.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=10-12,15-20')
- resource = self.makeResourceWithContent(b'abc')
- with resource.openForReading() as file:
- producer = resource.makeProducer(request, file)
- self.assertIsInstance(producer, static.MultipleRangeStaticProducer)
- def test_multipleUnsatisfiableRangesSets416ReqestedRangeNotSatisfiable(self):
- """
- makeProducer sets the response code of the request to of 'Requested
- Range Not Satisfiable' when the Range header requests multiple ranges,
- none of which are satisfiable.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=10-12,15-20')
- resource = self.makeResourceWithContent(b'abc')
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- http.REQUESTED_RANGE_NOT_SATISFIABLE, request.responseCode)
- def test_multipleUnsatisfiableRangeSetsContentHeaders(self):
- """
- makeProducer when the Range header requests multiple ranges, none of
- which are satisfiable, sets the Content-* headers appropriately.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=4-10')
- contentType = "text/plain"
- request.requestHeaders.addRawHeader(b'range', b'bytes=10-12,15-20')
- resource = self.makeResourceWithContent(b'abc', type=contentType)
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- {b'content-length': b'0',
- b'content-range': b'bytes */3',
- b'content-type': b'text/plain'},
- self.contentHeaders(request))
- def test_oneSatisfiableRangeIsEnough(self):
- """
- makeProducer when the Range header requests multiple ranges, at least
- one of which matches, sets the response code to 'Partial Content'.
- """
- request = DummyRequest([])
- request.requestHeaders.addRawHeader(b'range', b'bytes=1-3,100-200')
- resource = self.makeResourceWithContent(b'abcdef')
- with resource.openForReading() as file:
- resource.makeProducer(request, file)
- self.assertEqual(
- http.PARTIAL_CONTENT, request.responseCode)
- class StaticProducerTests(TestCase):
- """
- Tests for the abstract L{StaticProducer}.
- """
- def test_stopProducingClosesFile(self):
- """
- L{StaticProducer.stopProducing} closes the file object the producer is
- producing data from.
- """
- fileObject = StringIO()
- producer = static.StaticProducer(None, fileObject)
- producer.stopProducing()
- self.assertTrue(fileObject.closed)
- def test_stopProducingSetsRequestToNone(self):
- """
- L{StaticProducer.stopProducing} sets the request instance variable to
- None, which indicates to subclasses' resumeProducing methods that no
- more data should be produced.
- """
- fileObject = StringIO()
- producer = static.StaticProducer(DummyRequest([]), fileObject)
- producer.stopProducing()
- self.assertIdentical(None, producer.request)
- class NoRangeStaticProducerTests(TestCase):
- """
- Tests for L{NoRangeStaticProducer}.
- """
- def test_implementsIPullProducer(self):
- """
- L{NoRangeStaticProducer} implements L{IPullProducer}.
- """
- verifyObject(
- interfaces.IPullProducer,
- static.NoRangeStaticProducer(None, None))
- def test_resumeProducingProducesContent(self):
- """
- L{NoRangeStaticProducer.resumeProducing} writes content from the
- resource to the request.
- """
- request = DummyRequest([])
- content = b'abcdef'
- producer = static.NoRangeStaticProducer(
- request, StringIO(content))
- # start calls registerProducer on the DummyRequest, which pulls all
- # output from the producer and so we just need this one call.
- producer.start()
- self.assertEqual(content, b''.join(request.written))
- def test_resumeProducingBuffersOutput(self):
- """
- L{NoRangeStaticProducer.start} writes at most
- C{abstract.FileDescriptor.bufferSize} bytes of content from the
- resource to the request at once.
- """
- request = DummyRequest([])
- bufferSize = abstract.FileDescriptor.bufferSize
- content = b'a' * (2*bufferSize + 1)
- producer = static.NoRangeStaticProducer(
- request, StringIO(content))
- # start calls registerProducer on the DummyRequest, which pulls all
- # output from the producer and so we just need this one call.
- producer.start()
- expected = [
- content[0:bufferSize],
- content[bufferSize:2*bufferSize],
- content[2*bufferSize:]
- ]
- self.assertEqual(expected, request.written)
- def test_finishCalledWhenDone(self):
- """
- L{NoRangeStaticProducer.resumeProducing} calls finish() on the request
- after it is done producing content.
- """
- request = DummyRequest([])
- finishDeferred = request.notifyFinish()
- callbackList = []
- finishDeferred.addCallback(callbackList.append)
- producer = static.NoRangeStaticProducer(
- request, StringIO(b'abcdef'))
- # start calls registerProducer on the DummyRequest, which pulls all
- # output from the producer and so we just need this one call.
- producer.start()
- self.assertEqual([None], callbackList)
- class SingleRangeStaticProducerTests(TestCase):
- """
- Tests for L{SingleRangeStaticProducer}.
- """
- def test_implementsIPullProducer(self):
- """
- L{SingleRangeStaticProducer} implements L{IPullProducer}.
- """
- verifyObject(
- interfaces.IPullProducer,
- static.SingleRangeStaticProducer(None, None, None, None))
- def test_resumeProducingProducesContent(self):
- """
- L{SingleRangeStaticProducer.resumeProducing} writes the given amount
- of content, starting at the given offset, from the resource to the
- request.
- """
- request = DummyRequest([])
- content = b'abcdef'
- producer = static.SingleRangeStaticProducer(
- request, StringIO(content), 1, 3)
- # DummyRequest.registerProducer pulls all output from the producer, so
- # we just need to call start.
- producer.start()
- self.assertEqual(content[1:4], b''.join(request.written))
- def test_resumeProducingBuffersOutput(self):
- """
- L{SingleRangeStaticProducer.start} writes at most
- C{abstract.FileDescriptor.bufferSize} bytes of content from the
- resource to the request at once.
- """
- request = DummyRequest([])
- bufferSize = abstract.FileDescriptor.bufferSize
- content = b'abc' * bufferSize
- producer = static.SingleRangeStaticProducer(
- request, StringIO(content), 1, bufferSize+10)
- # DummyRequest.registerProducer pulls all output from the producer, so
- # we just need to call start.
- producer.start()
- expected = [
- content[1:bufferSize+1],
- content[bufferSize+1:bufferSize+11],
- ]
- self.assertEqual(expected, request.written)
- def test_finishCalledWhenDone(self):
- """
- L{SingleRangeStaticProducer.resumeProducing} calls finish() on the
- request after it is done producing content.
- """
- request = DummyRequest([])
- finishDeferred = request.notifyFinish()
- callbackList = []
- finishDeferred.addCallback(callbackList.append)
- producer = static.SingleRangeStaticProducer(
- request, StringIO(b'abcdef'), 1, 1)
- # start calls registerProducer on the DummyRequest, which pulls all
- # output from the producer and so we just need this one call.
- producer.start()
- self.assertEqual([None], callbackList)
- class MultipleRangeStaticProducerTests(TestCase):
- """
- Tests for L{MultipleRangeStaticProducer}.
- """
- def test_implementsIPullProducer(self):
- """
- L{MultipleRangeStaticProducer} implements L{IPullProducer}.
- """
- verifyObject(
- interfaces.IPullProducer,
- static.MultipleRangeStaticProducer(None, None, None))
- def test_resumeProducingProducesContent(self):
- """
- L{MultipleRangeStaticProducer.resumeProducing} writes the requested
- chunks of content from the resource to the request, with the supplied
- boundaries in between each chunk.
- """
- request = DummyRequest([])
- content = b'abcdef'
- producer = static.MultipleRangeStaticProducer(
- request, StringIO(content), [(b'1', 1, 3), (b'2', 5, 1)])
- # DummyRequest.registerProducer pulls all output from the producer, so
- # we just need to call start.
- producer.start()
- self.assertEqual(b'1bcd2f', b''.join(request.written))
- def test_resumeProducingBuffersOutput(self):
- """
- L{MultipleRangeStaticProducer.start} writes about
- C{abstract.FileDescriptor.bufferSize} bytes of content from the
- resource to the request at once.
- To be specific about the 'about' above: it can write slightly more,
- for example in the case where the first boundary plus the first chunk
- is less than C{bufferSize} but first boundary plus the first chunk
- plus the second boundary is more, but this is unimportant as in
- practice the boundaries are fairly small. On the other side, it is
- important for performance to bundle up several small chunks into one
- call to request.write.
- """
- request = DummyRequest([])
- content = b'0123456789' * 2
- producer = static.MultipleRangeStaticProducer(
- request, StringIO(content),
- [(b'a', 0, 2), (b'b', 5, 10), (b'c', 0, 0)])
- producer.bufferSize = 10
- # DummyRequest.registerProducer pulls all output from the producer, so
- # we just need to call start.
- producer.start()
- expected = [
- b'a' + content[0:2] + b'b' + content[5:11],
- content[11:15] + b'c',
- ]
- self.assertEqual(expected, request.written)
- def test_finishCalledWhenDone(self):
- """
- L{MultipleRangeStaticProducer.resumeProducing} calls finish() on the
- request after it is done producing content.
- """
- request = DummyRequest([])
- finishDeferred = request.notifyFinish()
- callbackList = []
- finishDeferred.addCallback(callbackList.append)
- producer = static.MultipleRangeStaticProducer(
- request, StringIO(b'abcdef'), [(b'', 1, 2)])
- # start calls registerProducer on the DummyRequest, which pulls all
- # output from the producer and so we just need this one call.
- producer.start()
- self.assertEqual([None], callbackList)
- class RangeTests(TestCase):
- """
- Tests for I{Range-Header} support in L{twisted.web.static.File}.
- @type file: L{file}
- @ivar file: Temporary (binary) file containing the content to be served.
- @type resource: L{static.File}
- @ivar resource: A leaf web resource using C{file} as content.
- @type request: L{DummyRequest}
- @ivar request: A fake request, requesting C{resource}.
- @type catcher: L{list}
- @ivar catcher: List which gathers all log information.
- """
- def setUp(self):
- """
- Create a temporary file with a fixed payload of 64 bytes. Create a
- resource for that file and create a request which will be for that
- resource. Each test can set a different range header to test different
- aspects of the implementation.
- """
- path = FilePath(self.mktemp())
- # This is just a jumble of random stuff. It's supposed to be a good
- # set of data for this test, particularly in order to avoid
- # accidentally seeing the right result by having a byte sequence
- # repeated at different locations or by having byte values which are
- # somehow correlated with their position in the string.
- self.payload = (b'\xf8u\xf3E\x8c7\xce\x00\x9e\xb6a0y0S\xf0\xef\xac\xb7'
- b'\xbe\xb5\x17M\x1e\x136k{\x1e\xbe\x0c\x07\x07\t\xd0'
- b'\xbckY\xf5I\x0b\xb8\x88oZ\x1d\x85b\x1a\xcdk\xf2\x1d'
- b'&\xfd%\xdd\x82q/A\x10Y\x8b')
- path.setContent(self.payload)
- self.file = path.open()
- self.resource = static.File(self.file.name)
- self.resource.isLeaf = 1
- self.request = DummyRequest([b''])
- self.request.uri = self.file.name
- self.catcher = []
- log.addObserver(self.catcher.append)
- def tearDown(self):
- """
- Clean up the resource file and the log observer.
- """
- self.file.close()
- log.removeObserver(self.catcher.append)
- def _assertLogged(self, expected):
- """
- Asserts that a given log message occurred with an expected message.
- """
- logItem = self.catcher.pop()
- self.assertEqual(logItem["message"][0], expected)
- self.assertEqual(
- self.catcher, [], "An additional log occurred: %r" % (logItem,))
- def test_invalidRanges(self):
- """
- L{File._parseRangeHeader} raises L{ValueError} when passed
- syntactically invalid byte ranges.
- """
- f = self.resource._parseRangeHeader
- # there's no =
- self.assertRaises(ValueError, f, b'bytes')
- # unknown isn't a valid Bytes-Unit
- self.assertRaises(ValueError, f, b'unknown=1-2')
- # there's no - in =stuff
- self.assertRaises(ValueError, f, b'bytes=3')
- # both start and end are empty
- self.assertRaises(ValueError, f, b'bytes=-')
- # start isn't an integer
- self.assertRaises(ValueError, f, b'bytes=foo-')
- # end isn't an integer
- self.assertRaises(ValueError, f, b'bytes=-foo')
- # end isn't equal to or greater than start
- self.assertRaises(ValueError, f, b'bytes=5-4')
- def test_rangeMissingStop(self):
- """
- A single bytes range without an explicit stop position is parsed into a
- two-tuple giving the start position and L{None}.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=0-'), [(0, None)])
- def test_rangeMissingStart(self):
- """
- A single bytes range without an explicit start position is parsed into
- a two-tuple of L{None} and the end position.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=-3'), [(None, 3)])
- def test_range(self):
- """
- A single bytes range with explicit start and stop positions is parsed
- into a two-tuple of those positions.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=2-5'), [(2, 5)])
- def test_rangeWithSpace(self):
- """
- A single bytes range with whitespace in allowed places is parsed in
- the same way as it would be without the whitespace.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b' bytes=1-2 '), [(1, 2)])
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes =1-2 '), [(1, 2)])
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes= 1-2'), [(1, 2)])
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=1 -2'), [(1, 2)])
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=1- 2'), [(1, 2)])
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=1-2 '), [(1, 2)])
- def test_nullRangeElements(self):
- """
- If there are multiple byte ranges but only one is non-null, the
- non-null range is parsed and its start and stop returned.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=1-2,\r\n, ,\t'), [(1, 2)])
- def test_multipleRanges(self):
- """
- If multiple byte ranges are specified their starts and stops are
- returned.
- """
- self.assertEqual(
- self.resource._parseRangeHeader(b'bytes=1-2,3-4'),
- [(1, 2), (3, 4)])
- def test_bodyLength(self):
- """
- A correct response to a range request is as long as the length of the
- requested range.
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=0-43')
- self.resource.render(self.request)
- self.assertEqual(len(b''.join(self.request.written)), 44)
- def test_invalidRangeRequest(self):
- """
- An incorrect range request (RFC 2616 defines a correct range request as
- a Bytes-Unit followed by a '=' character followed by a specific range.
- Only 'bytes' is defined) results in the range header value being logged
- and a normal 200 response being sent.
- """
- range = b'foobar=0-43'
- self.request.requestHeaders.addRawHeader(b'range', range)
- self.resource.render(self.request)
- expected = "Ignoring malformed Range header %r" % (range.decode(),)
- self._assertLogged(expected)
- self.assertEqual(b''.join(self.request.written), self.payload)
- self.assertEqual(self.request.responseCode, http.OK)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-length')[0],
- intToBytes(len(self.payload)))
- def parseMultipartBody(self, body, boundary):
- """
- Parse C{body} as a multipart MIME response separated by C{boundary}.
- Note that this with fail the calling test on certain syntactic
- problems.
- """
- sep = b"\r\n--" + boundary
- parts = body.split(sep)
- self.assertEqual(b'', parts[0])
- self.assertEqual(b'--\r\n', parts[-1])
- parsed_parts = []
- for part in parts[1:-1]:
- before, header1, header2, blank, partBody = part.split(b'\r\n', 4)
- headers = header1 + b'\n' + header2
- self.assertEqual(b'', before)
- self.assertEqual(b'', blank)
- partContentTypeValue = re.search(
- b'^content-type: (.*)$', headers, re.I|re.M).group(1)
- start, end, size = re.search(
- b'^content-range: bytes ([0-9]+)-([0-9]+)/([0-9]+)$',
- headers, re.I|re.M).groups()
- parsed_parts.append(
- {b'contentType': partContentTypeValue,
- b'contentRange': (start, end, size),
- b'body': partBody})
- return parsed_parts
- def test_multipleRangeRequest(self):
- """
- The response to a request for multiple bytes ranges is a MIME-ish
- multipart response.
- """
- startEnds = [(0, 2), (20, 30), (40, 50)]
- rangeHeaderValue = b','.join([networkString("%s-%s" % (s,e)) for (s, e) in startEnds])
- self.request.requestHeaders.addRawHeader(b'range',
- b'bytes=' + rangeHeaderValue)
- self.resource.render(self.request)
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- boundary = re.match(
- b'^multipart/byteranges; boundary="(.*)"$',
- self.request.responseHeaders.getRawHeaders(b'content-type')[0]).group(1)
- parts = self.parseMultipartBody(b''.join(self.request.written), boundary)
- self.assertEqual(len(startEnds), len(parts))
- for part, (s, e) in zip(parts, startEnds):
- self.assertEqual(networkString(self.resource.type),
- part[b'contentType'])
- start, end, size = part[b'contentRange']
- self.assertEqual(int(start), s)
- self.assertEqual(int(end), e)
- self.assertEqual(int(size), self.resource.getFileSize())
- self.assertEqual(self.payload[s:e+1], part[b'body'])
- def test_multipleRangeRequestWithRangeOverlappingEnd(self):
- """
- The response to a request for multiple bytes ranges is a MIME-ish
- multipart response, even when one of the ranged falls off the end of
- the resource.
- """
- startEnds = [(0, 2), (40, len(self.payload) + 10)]
- rangeHeaderValue = b','.join([networkString("%s-%s" % (s,e)) for (s, e) in startEnds])
- self.request.requestHeaders.addRawHeader(b'range',
- b'bytes=' + rangeHeaderValue)
- self.resource.render(self.request)
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- boundary = re.match(
- b'^multipart/byteranges; boundary="(.*)"$',
- self.request.responseHeaders.getRawHeaders(b'content-type')[0]).group(1)
- parts = self.parseMultipartBody(b''.join(self.request.written), boundary)
- self.assertEqual(len(startEnds), len(parts))
- for part, (s, e) in zip(parts, startEnds):
- self.assertEqual(networkString(self.resource.type),
- part[b'contentType'])
- start, end, size = part[b'contentRange']
- self.assertEqual(int(start), s)
- self.assertEqual(int(end), min(e, self.resource.getFileSize()-1))
- self.assertEqual(int(size), self.resource.getFileSize())
- self.assertEqual(self.payload[s:e+1], part[b'body'])
- def test_implicitEnd(self):
- """
- If the end byte position is omitted, then it is treated as if the
- length of the resource was specified by the end byte position.
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=23-')
- self.resource.render(self.request)
- self.assertEqual(b''.join(self.request.written), self.payload[23:])
- self.assertEqual(len(b''.join(self.request.written)), 41)
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-range')[0],
- b'bytes 23-63/64')
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'41')
- def test_implicitStart(self):
- """
- If the start byte position is omitted but the end byte position is
- supplied, then the range is treated as requesting the last -N bytes of
- the resource, where N is the end byte position.
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=-17')
- self.resource.render(self.request)
- self.assertEqual(b''.join(self.request.written), self.payload[-17:])
- self.assertEqual(len(b''.join(self.request.written)), 17)
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-range')[0],
- b'bytes 47-63/64')
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'17')
- def test_explicitRange(self):
- """
- A correct response to a bytes range header request from A to B starts
- with the A'th byte and ends with (including) the B'th byte. The first
- byte of a page is numbered with 0.
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=3-43')
- self.resource.render(self.request)
- written = b''.join(self.request.written)
- self.assertEqual(written, self.payload[3:44])
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-range')[0],
- b'bytes 3-43/64')
- self.assertEqual(
- intToBytes(len(written)),
- self.request.responseHeaders.getRawHeaders(b'content-length')[0])
- def test_explicitRangeOverlappingEnd(self):
- """
- A correct response to a bytes range header request from A to B when B
- is past the end of the resource starts with the A'th byte and ends
- with the last byte of the resource. The first byte of a page is
- numbered with 0.
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=40-100')
- self.resource.render(self.request)
- written = b''.join(self.request.written)
- self.assertEqual(written, self.payload[40:])
- self.assertEqual(self.request.responseCode, http.PARTIAL_CONTENT)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-range')[0],
- b'bytes 40-63/64')
- self.assertEqual(
- intToBytes(len(written)),
- self.request.responseHeaders.getRawHeaders(b'content-length')[0])
- def test_statusCodeRequestedRangeNotSatisfiable(self):
- """
- If a range is syntactically invalid due to the start being greater than
- the end, the range header is ignored (the request is responded to as if
- it were not present).
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=20-13')
- self.resource.render(self.request)
- self.assertEqual(self.request.responseCode, http.OK)
- self.assertEqual(b''.join(self.request.written), self.payload)
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-length')[0],
- intToBytes(len(self.payload)))
- def test_invalidStartBytePos(self):
- """
- If a range is unsatisfiable due to the start not being less than the
- length of the resource, the response is 416 (Requested range not
- satisfiable) and no data is written to the response body (RFC 2616,
- section 14.35.1).
- """
- self.request.requestHeaders.addRawHeader(b'range', b'bytes=67-108')
- self.resource.render(self.request)
- self.assertEqual(
- self.request.responseCode, http.REQUESTED_RANGE_NOT_SATISFIABLE)
- self.assertEqual(b''.join(self.request.written), b'')
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-length')[0],
- b'0')
- # Sections 10.4.17 and 14.16
- self.assertEqual(
- self.request.responseHeaders.getRawHeaders(b'content-range')[0],
- networkString('bytes */%d' % (len(self.payload),)))
- class DirectoryListerTests(TestCase):
- """
- Tests for L{static.DirectoryLister}.
- """
- def _request(self, uri):
- request = DummyRequest([b''])
- request.uri = uri
- return request
- def test_renderHeader(self):
- """
- L{static.DirectoryLister} prints the request uri as header of the
- rendered content.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- lister = static.DirectoryLister(path.path)
- data = lister.render(self._request(b'foo'))
- self.assertIn(b"<h1>Directory listing for foo</h1>", data)
- self.assertIn(b"<title>Directory listing for foo</title>", data)
- def test_renderUnquoteHeader(self):
- """
- L{static.DirectoryLister} unquote the request uri before printing it.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- lister = static.DirectoryLister(path.path)
- data = lister.render(self._request(b'foo%20bar'))
- self.assertIn(b"<h1>Directory listing for foo bar</h1>", data)
- self.assertIn(b"<title>Directory listing for foo bar</title>", data)
- def test_escapeHeader(self):
- """
- L{static.DirectoryLister} escape "&", "<" and ">" after unquoting the
- request uri.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- lister = static.DirectoryLister(path.path)
- data = lister.render(self._request(b'foo%26bar'))
- self.assertIn(b"<h1>Directory listing for foo&bar</h1>", data)
- self.assertIn(b"<title>Directory listing for foo&bar</title>", data)
- def test_renderFiles(self):
- """
- L{static.DirectoryLister} is able to list all the files inside a
- directory.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- path.child('file1').setContent(b"content1")
- path.child('file2').setContent(b"content2" * 1000)
- lister = static.DirectoryLister(path.path)
- data = lister.render(self._request(b'foo'))
- body = b"""<tr class="odd">
- <td><a href="file1">file1</a></td>
- <td>8B</td>
- <td>[text/html]</td>
- <td></td>
- </tr>
- <tr class="even">
- <td><a href="file2">file2</a></td>
- <td>7K</td>
- <td>[text/html]</td>
- <td></td>
- </tr>"""
- self.assertIn(body, data)
- def test_renderDirectories(self):
- """
- L{static.DirectoryLister} is able to list all the directories inside
- a directory.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- path.child('dir1').makedirs()
- path.child('dir2 & 3').makedirs()
- lister = static.DirectoryLister(path.path)
- data = lister.render(self._request(b'foo'))
- body = b"""<tr class="odd">
- <td><a href="dir1/">dir1/</a></td>
- <td></td>
- <td>[Directory]</td>
- <td></td>
- </tr>
- <tr class="even">
- <td><a href="dir2%20%26%203/">dir2 & 3/</a></td>
- <td></td>
- <td>[Directory]</td>
- <td></td>
- </tr>"""
- self.assertIn(body, data)
- def test_renderFiltered(self):
- """
- L{static.DirectoryLister} takes an optional C{dirs} argument that
- filter out the list of directories and files printed.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- path.child('dir1').makedirs()
- path.child('dir2').makedirs()
- path.child('dir3').makedirs()
- lister = static.DirectoryLister(path.path, dirs=["dir1", "dir3"])
- data = lister.render(self._request(b'foo'))
- body = b"""<tr class="odd">
- <td><a href="dir1/">dir1/</a></td>
- <td></td>
- <td>[Directory]</td>
- <td></td>
- </tr>
- <tr class="even">
- <td><a href="dir3/">dir3/</a></td>
- <td></td>
- <td>[Directory]</td>
- <td></td>
- </tr>"""
- self.assertIn(body, data)
- def test_oddAndEven(self):
- """
- L{static.DirectoryLister} gives an alternate class for each odd and
- even rows in the table.
- """
- lister = static.DirectoryLister(None)
- elements = [{"href": "", "text": "", "size": "", "type": "",
- "encoding": ""} for i in range(5)]
- content = lister._buildTableContent(elements)
- self.assertEqual(len(content), 5)
- self.assertTrue(content[0].startswith('<tr class="odd">'))
- self.assertTrue(content[1].startswith('<tr class="even">'))
- self.assertTrue(content[2].startswith('<tr class="odd">'))
- self.assertTrue(content[3].startswith('<tr class="even">'))
- self.assertTrue(content[4].startswith('<tr class="odd">'))
- def test_contentType(self):
- """
- L{static.DirectoryLister} produces a MIME-type that indicates that it is
- HTML, and includes its charset (UTF-8).
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- lister = static.DirectoryLister(path.path)
- req = self._request(b'')
- lister.render(req)
- self.assertEqual(req.responseHeaders.getRawHeaders(b'content-type')[0],
- b"text/html; charset=utf-8")
- def test_mimeTypeAndEncodings(self):
- """
- L{static.DirectoryLister} is able to detect mimetype and encoding of
- listed files.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- path.child('file1.txt').setContent(b"file1")
- path.child('file2.py').setContent(b"python")
- path.child('file3.conf.gz').setContent(b"conf compressed")
- path.child('file4.diff.bz2').setContent(b"diff compressed")
- directory = os.listdir(path.path)
- directory.sort()
- contentTypes = {
- ".txt": "text/plain",
- ".py": "text/python",
- ".conf": "text/configuration",
- ".diff": "text/diff"
- }
- lister = static.DirectoryLister(path.path, contentTypes=contentTypes)
- dirs, files = lister._getFilesAndDirectories(directory)
- self.assertEqual(dirs, [])
- self.assertEqual(files, [
- {'encoding': '',
- 'href': 'file1.txt',
- 'size': '5B',
- 'text': 'file1.txt',
- 'type': '[text/plain]'},
- {'encoding': '',
- 'href': 'file2.py',
- 'size': '6B',
- 'text': 'file2.py',
- 'type': '[text/python]'},
- {'encoding': '[gzip]',
- 'href': 'file3.conf.gz',
- 'size': '15B',
- 'text': 'file3.conf.gz',
- 'type': '[text/configuration]'},
- {'encoding': '[bzip2]',
- 'href': 'file4.diff.bz2',
- 'size': '15B',
- 'text': 'file4.diff.bz2',
- 'type': '[text/diff]'}])
- def test_brokenSymlink(self):
- """
- If on the file in the listing points to a broken symlink, it should not
- be returned by L{static.DirectoryLister._getFilesAndDirectories}.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- file1 = path.child('file1')
- file1.setContent(b"file1")
- file1.linkTo(path.child("file2"))
- file1.remove()
- lister = static.DirectoryLister(path.path)
- directory = os.listdir(path.path)
- directory.sort()
- dirs, files = lister._getFilesAndDirectories(directory)
- self.assertEqual(dirs, [])
- self.assertEqual(files, [])
- if not platform._supportsSymlinks():
- test_brokenSymlink.skip = "No symlink support"
- def test_childrenNotFound(self):
- """
- Any child resource of L{static.DirectoryLister} renders an HTTP
- I{NOT FOUND} response code.
- """
- path = FilePath(self.mktemp())
- path.makedirs()
- lister = static.DirectoryLister(path.path)
- request = self._request(b'')
- child = resource.getChildForRequest(lister, request)
- result = _render(child, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, http.NOT_FOUND)
- result.addCallback(cbRendered)
- return result
- def test_repr(self):
- """
- L{static.DirectoryLister.__repr__} gives the path of the lister.
- """
- path = FilePath(self.mktemp())
- lister = static.DirectoryLister(path.path)
- self.assertEqual(repr(lister),
- "<DirectoryLister of %r>" % (path.path,))
- self.assertEqual(str(lister),
- "<DirectoryLister of %r>" % (path.path,))
- def test_formatFileSize(self):
- """
- L{static.formatFileSize} format an amount of bytes into a more readable
- format.
- """
- self.assertEqual(static.formatFileSize(0), "0B")
- self.assertEqual(static.formatFileSize(123), "123B")
- self.assertEqual(static.formatFileSize(4567), "4K")
- self.assertEqual(static.formatFileSize(8900000), "8M")
- self.assertEqual(static.formatFileSize(1234000000), "1G")
- self.assertEqual(static.formatFileSize(1234567890000), "1149G")
- class LoadMimeTypesTests(TestCase):
- """
- Tests for the MIME type loading routine.
- @cvar UNSET: A sentinel to signify that C{self.paths} has not been set by
- the mock init.
- """
- UNSET = object()
- def setUp(self):
- self.paths = self.UNSET
- def _fakeInit(self, paths):
- """
- A mock L{mimetypes.init} that records the value of the passed C{paths}
- argument.
- @param paths: The paths that will be recorded.
- """
- self.paths = paths
- def test_defaultArgumentIsNone(self):
- """
- By default, L{None} is passed to C{mimetypes.init}.
- """
- static.loadMimeTypes(init=self._fakeInit)
- self.assertIdentical(self.paths, None)
- def test_extraLocationsWork(self):
- """
- Passed MIME type files are passed to C{mimetypes.init}.
- """
- paths = ["x", "y", "z"]
- static.loadMimeTypes(paths, init=self._fakeInit)
- self.assertIdentical(self.paths, paths)
- def test_usesGlobalInitFunction(self):
- """
- By default, C{mimetypes.init} is called.
- """
- # Checking mimetypes.inited doesn't always work, because
- # something, somewhere, calls mimetypes.init. Yay global
- # mutable state :)
- if _PY3:
- signature = inspect.signature(static.loadMimeTypes)
- self.assertIs(signature.parameters["init"].default,
- mimetypes.init)
- else:
- args, _, _, defaults = inspect.getargspec(static.loadMimeTypes)
- defaultInit = defaults[args.index("init")]
- self.assertIs(defaultInit, mimetypes.init)
- class StaticDeprecationTests(TestCase):
- def test_addSlashDeprecated(self):
- """
- L{twisted.web.static.addSlash} is deprecated.
- """
- from twisted.web.static import addSlash
- addSlash(DummyRequest([b'']))
- warnings = self.flushWarnings([self.test_addSlashDeprecated])
- self.assertEqual(len(warnings), 1)
- self.assertEqual(warnings[0]['message'],
- "twisted.web.static.addSlash was deprecated in Twisted 16.0.0")
|