123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.web.twcgi}.
- """
- import sys
- import os
- import json
- from io import BytesIO
- from twisted.trial import unittest
- from twisted.internet import reactor, interfaces, error
- from twisted.python import util, failure, log
- from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
- from twisted.web import client, twcgi, server, resource, http_headers
- from twisted.web.test._util import _render
- from twisted.web.test.test_web import DummyRequest
- DUMMY_CGI = '''\
- print("Header: OK")
- print("")
- print("cgi output")
- '''
- DUAL_HEADER_CGI = '''\
- print("Header: spam")
- print("Header: eggs")
- print("")
- print("cgi output")
- '''
- BROKEN_HEADER_CGI = '''\
- print("XYZ")
- print("")
- print("cgi output")
- '''
- SPECIAL_HEADER_CGI = '''\
- print("Server: monkeys")
- print("Date: last year")
- print("")
- print("cgi output")
- '''
- READINPUT_CGI = '''\
- # This is an example of a correctly-written CGI script which reads a body
- # from stdin, which only reads env['CONTENT_LENGTH'] bytes.
- import os, sys
- body_length = int(os.environ.get('CONTENT_LENGTH',0))
- indata = sys.stdin.read(body_length)
- print("Header: OK")
- print("")
- print("readinput ok")
- '''
- READALLINPUT_CGI = '''\
- # This is an example of the typical (incorrect) CGI script which expects
- # the server to close stdin when the body of the request is complete.
- # A correct CGI should only read env['CONTENT_LENGTH'] bytes.
- import sys
- indata = sys.stdin.read()
- print("Header: OK")
- print("")
- print("readallinput ok")
- '''
- NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI = '''\
- print("content-type: text/cgi-duplicate-test")
- print("")
- print("cgi output")
- '''
- HEADER_OUTPUT_CGI = '''\
- import json
- import os
- print("")
- print("")
- vals = {x:y for x,y in os.environ.items() if x.startswith("HTTP_")}
- print(json.dumps(vals))
- '''
- class PythonScript(twcgi.FilteredScript):
- filter = sys.executable
- class CGITests(unittest.TestCase):
- """
- Tests for L{twcgi.FilteredScript}.
- """
- if not interfaces.IReactorProcess.providedBy(reactor):
- skip = "CGI tests require a functional reactor.spawnProcess()"
- def startServer(self, cgi):
- root = resource.Resource()
- cgipath = util.sibpath(__file__, cgi)
- root.putChild(b"cgi", PythonScript(cgipath))
- site = server.Site(root)
- self.p = reactor.listenTCP(0, site)
- return self.p.getHost().port
- def tearDown(self):
- if getattr(self, 'p', None):
- return self.p.stopListening()
- def writeCGI(self, source):
- cgiFilename = os.path.abspath(self.mktemp())
- with open(cgiFilename, 'wt') as cgiFile:
- cgiFile.write(source)
- return cgiFilename
- def test_CGI(self):
- cgiFilename = self.writeCGI(DUMMY_CGI)
- portnum = self.startServer(cgiFilename)
- url = 'http://localhost:%d/cgi' % (portnum,)
- url = url.encode("ascii")
- d = client.Agent(reactor).request(b"GET", url)
- d.addCallback(client.readBody)
- d.addCallback(self._testCGI_1)
- return d
- def _testCGI_1(self, res):
- self.assertEqual(res, b"cgi output" + os.linesep.encode("ascii"))
- def test_protectedServerAndDate(self):
- """
- If the CGI script emits a I{Server} or I{Date} header, these are
- ignored.
- """
- cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- agent = client.Agent(reactor)
- d = agent.request(b"GET", url)
- d.addCallback(discardBody)
- def checkResponse(response):
- self.assertNotIn('monkeys',
- response.headers.getRawHeaders('server'))
- self.assertNotIn('last year',
- response.headers.getRawHeaders('date'))
- d.addCallback(checkResponse)
- return d
- def test_noDuplicateContentTypeHeaders(self):
- """
- If the CGI script emits a I{content-type} header, make sure that the
- server doesn't add an additional (duplicate) one, as per ticket 4786.
- """
- cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- agent = client.Agent(reactor)
- d = agent.request(b"GET", url)
- d.addCallback(discardBody)
- def checkResponse(response):
- self.assertEqual(
- response.headers.getRawHeaders('content-type'),
- ['text/cgi-duplicate-test'])
- return response
- d.addCallback(checkResponse)
- return d
- def test_noProxyPassthrough(self):
- """
- The CGI script is never called with the Proxy header passed through.
- """
- cgiFilename = self.writeCGI(HEADER_OUTPUT_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- agent = client.Agent(reactor)
- headers = http_headers.Headers({b"Proxy": [b"foo"],
- b"X-Innocent-Header": [b"bar"]})
- d = agent.request(b"GET", url, headers=headers)
- def checkResponse(response):
- headers = json.loads(response.decode("ascii"))
- self.assertEqual(
- set(headers.keys()),
- {"HTTP_HOST", "HTTP_CONNECTION", "HTTP_X_INNOCENT_HEADER"})
- d.addCallback(client.readBody)
- d.addCallback(checkResponse)
- return d
- def test_duplicateHeaderCGI(self):
- """
- If a CGI script emits two instances of the same header, both are sent
- in the response.
- """
- cgiFilename = self.writeCGI(DUAL_HEADER_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- agent = client.Agent(reactor)
- d = agent.request(b"GET", url)
- d.addCallback(discardBody)
- def checkResponse(response):
- self.assertEqual(
- response.headers.getRawHeaders('header'), ['spam', 'eggs'])
- d.addCallback(checkResponse)
- return d
- def test_malformedHeaderCGI(self):
- """
- Check for the error message in the duplicated header
- """
- cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- agent = client.Agent(reactor)
- d = agent.request(b"GET", url)
- d.addCallback(discardBody)
- loggedMessages = []
- def addMessage(eventDict):
- loggedMessages.append(log.textFromEventDict(eventDict))
- log.addObserver(addMessage)
- self.addCleanup(log.removeObserver, addMessage)
- def checkResponse(ignored):
- self.assertIn("ignoring malformed CGI header: " + repr(b'XYZ'),
- loggedMessages)
- d.addCallback(checkResponse)
- return d
- def test_ReadEmptyInput(self):
- cgiFilename = os.path.abspath(self.mktemp())
- with open(cgiFilename, 'wt') as cgiFile:
- cgiFile.write(READINPUT_CGI)
- portnum = self.startServer(cgiFilename)
- agent = client.Agent(reactor)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- d = agent.request(b"GET", url)
- d.addCallback(client.readBody)
- d.addCallback(self._test_ReadEmptyInput_1)
- return d
- test_ReadEmptyInput.timeout = 5
- def _test_ReadEmptyInput_1(self, res):
- expected = "readinput ok{}".format(os.linesep)
- expected = expected.encode("ascii")
- self.assertEqual(res, expected)
- def test_ReadInput(self):
- cgiFilename = os.path.abspath(self.mktemp())
- with open(cgiFilename, 'wt') as cgiFile:
- cgiFile.write(READINPUT_CGI)
- portnum = self.startServer(cgiFilename)
- agent = client.Agent(reactor)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- d = agent.request(
- uri=url,
- method=b"POST",
- bodyProducer=client.FileBodyProducer(
- BytesIO(b"Here is your stdin")),
- )
- d.addCallback(client.readBody)
- d.addCallback(self._test_ReadInput_1)
- return d
- test_ReadInput.timeout = 5
- def _test_ReadInput_1(self, res):
- expected = "readinput ok{}".format(os.linesep)
- expected = expected.encode("ascii")
- self.assertEqual(res, expected)
- def test_ReadAllInput(self):
- cgiFilename = os.path.abspath(self.mktemp())
- with open(cgiFilename, 'wt') as cgiFile:
- cgiFile.write(READALLINPUT_CGI)
- portnum = self.startServer(cgiFilename)
- url = "http://localhost:%d/cgi" % (portnum,)
- url = url.encode("ascii")
- d = client.Agent(reactor).request(
- uri=url,
- method=b"POST",
- bodyProducer=client.FileBodyProducer(
- BytesIO(b"Here is your stdin")),
- )
- d.addCallback(client.readBody)
- d.addCallback(self._test_ReadAllInput_1)
- return d
- test_ReadAllInput.timeout = 5
- def _test_ReadAllInput_1(self, res):
- expected = "readallinput ok{}".format(os.linesep)
- expected = expected.encode("ascii")
- self.assertEqual(res, expected)
- def test_useReactorArgument(self):
- """
- L{twcgi.FilteredScript.runProcess} uses the reactor passed as an
- argument to the constructor.
- """
- class FakeReactor:
- """
- A fake reactor recording whether spawnProcess is called.
- """
- called = False
- def spawnProcess(self, *args, **kwargs):
- """
- Set the C{called} flag to C{True} if C{spawnProcess} is called.
- @param args: Positional arguments.
- @param kwargs: Keyword arguments.
- """
- self.called = True
- fakeReactor = FakeReactor()
- request = DummyRequest(['a', 'b'])
- resource = twcgi.FilteredScript("dummy-file", reactor=fakeReactor)
- _render(resource, request)
- self.assertTrue(fakeReactor.called)
- class CGIScriptTests(unittest.TestCase):
- """
- Tests for L{twcgi.CGIScript}.
- """
- def test_pathInfo(self):
- """
- L{twcgi.CGIScript.render} sets the process environment
- I{PATH_INFO} from the request path.
- """
- class FakeReactor:
- """
- A fake reactor recording the environment passed to spawnProcess.
- """
- def spawnProcess(self, process, filename, args, env, wdir):
- """
- Store the C{env} L{dict} to an instance attribute.
- @param process: Ignored
- @param filename: Ignored
- @param args: Ignored
- @param env: The environment L{dict} which will be stored
- @param wdir: Ignored
- """
- self.process_env = env
- _reactor = FakeReactor()
- resource = twcgi.CGIScript(self.mktemp(), reactor=_reactor)
- request = DummyRequest(['a', 'b'])
- _render(resource, request)
- self.assertEqual(_reactor.process_env["PATH_INFO"],
- "/a/b")
- class CGIDirectoryTests(unittest.TestCase):
- """
- Tests for L{twcgi.CGIDirectory}.
- """
- def test_render(self):
- """
- L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
- FOUND}.
- """
- resource = twcgi.CGIDirectory(self.mktemp())
- request = DummyRequest([''])
- d = _render(resource, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, NOT_FOUND)
- d.addCallback(cbRendered)
- return d
- def test_notFoundChild(self):
- """
- L{twcgi.CGIDirectory.getChild} returns a resource which renders an
- response with the HTTP I{NOT FOUND} status code if the indicated child
- does not exist as an entry in the directory used to initialized the
- L{twcgi.CGIDirectory}.
- """
- path = self.mktemp()
- os.makedirs(path)
- resource = twcgi.CGIDirectory(path)
- request = DummyRequest(['foo'])
- child = resource.getChild("foo", request)
- d = _render(child, request)
- def cbRendered(ignored):
- self.assertEqual(request.responseCode, NOT_FOUND)
- d.addCallback(cbRendered)
- return d
- class CGIProcessProtocolTests(unittest.TestCase):
- """
- Tests for L{twcgi.CGIProcessProtocol}.
- """
- def test_prematureEndOfHeaders(self):
- """
- If the process communicating with L{CGIProcessProtocol} ends before
- finishing writing out headers, the response has I{INTERNAL SERVER
- ERROR} as its status code.
- """
- request = DummyRequest([''])
- protocol = twcgi.CGIProcessProtocol(request)
- protocol.processEnded(failure.Failure(error.ProcessTerminated()))
- self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)
- def discardBody(response):
- """
- Discard the body of a HTTP response.
- @param response: The response.
- @return: The response.
- """
- return client.readBody(response).addCallback(lambda _: response)
|