test_cgi.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for L{twisted.web.twcgi}.
  5. """
  6. import sys
  7. import os
  8. import json
  9. from io import BytesIO
  10. from twisted.trial import unittest
  11. from twisted.internet import reactor, interfaces, error
  12. from twisted.python import util, failure, log
  13. from twisted.web.http import NOT_FOUND, INTERNAL_SERVER_ERROR
  14. from twisted.web import client, twcgi, server, resource, http_headers
  15. from twisted.web.test._util import _render
  16. from twisted.web.test.test_web import DummyRequest
  17. DUMMY_CGI = '''\
  18. print("Header: OK")
  19. print("")
  20. print("cgi output")
  21. '''
  22. DUAL_HEADER_CGI = '''\
  23. print("Header: spam")
  24. print("Header: eggs")
  25. print("")
  26. print("cgi output")
  27. '''
  28. BROKEN_HEADER_CGI = '''\
  29. print("XYZ")
  30. print("")
  31. print("cgi output")
  32. '''
  33. SPECIAL_HEADER_CGI = '''\
  34. print("Server: monkeys")
  35. print("Date: last year")
  36. print("")
  37. print("cgi output")
  38. '''
  39. READINPUT_CGI = '''\
  40. # This is an example of a correctly-written CGI script which reads a body
  41. # from stdin, which only reads env['CONTENT_LENGTH'] bytes.
  42. import os, sys
  43. body_length = int(os.environ.get('CONTENT_LENGTH',0))
  44. indata = sys.stdin.read(body_length)
  45. print("Header: OK")
  46. print("")
  47. print("readinput ok")
  48. '''
  49. READALLINPUT_CGI = '''\
  50. # This is an example of the typical (incorrect) CGI script which expects
  51. # the server to close stdin when the body of the request is complete.
  52. # A correct CGI should only read env['CONTENT_LENGTH'] bytes.
  53. import sys
  54. indata = sys.stdin.read()
  55. print("Header: OK")
  56. print("")
  57. print("readallinput ok")
  58. '''
  59. NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI = '''\
  60. print("content-type: text/cgi-duplicate-test")
  61. print("")
  62. print("cgi output")
  63. '''
  64. HEADER_OUTPUT_CGI = '''\
  65. import json
  66. import os
  67. print("")
  68. print("")
  69. vals = {x:y for x,y in os.environ.items() if x.startswith("HTTP_")}
  70. print(json.dumps(vals))
  71. '''
  72. class PythonScript(twcgi.FilteredScript):
  73. filter = sys.executable
  74. class CGITests(unittest.TestCase):
  75. """
  76. Tests for L{twcgi.FilteredScript}.
  77. """
  78. if not interfaces.IReactorProcess.providedBy(reactor):
  79. skip = "CGI tests require a functional reactor.spawnProcess()"
  80. def startServer(self, cgi):
  81. root = resource.Resource()
  82. cgipath = util.sibpath(__file__, cgi)
  83. root.putChild(b"cgi", PythonScript(cgipath))
  84. site = server.Site(root)
  85. self.p = reactor.listenTCP(0, site)
  86. return self.p.getHost().port
  87. def tearDown(self):
  88. if getattr(self, 'p', None):
  89. return self.p.stopListening()
  90. def writeCGI(self, source):
  91. cgiFilename = os.path.abspath(self.mktemp())
  92. with open(cgiFilename, 'wt') as cgiFile:
  93. cgiFile.write(source)
  94. return cgiFilename
  95. def test_CGI(self):
  96. cgiFilename = self.writeCGI(DUMMY_CGI)
  97. portnum = self.startServer(cgiFilename)
  98. url = 'http://localhost:%d/cgi' % (portnum,)
  99. url = url.encode("ascii")
  100. d = client.Agent(reactor).request(b"GET", url)
  101. d.addCallback(client.readBody)
  102. d.addCallback(self._testCGI_1)
  103. return d
  104. def _testCGI_1(self, res):
  105. self.assertEqual(res, b"cgi output" + os.linesep.encode("ascii"))
  106. def test_protectedServerAndDate(self):
  107. """
  108. If the CGI script emits a I{Server} or I{Date} header, these are
  109. ignored.
  110. """
  111. cgiFilename = self.writeCGI(SPECIAL_HEADER_CGI)
  112. portnum = self.startServer(cgiFilename)
  113. url = "http://localhost:%d/cgi" % (portnum,)
  114. url = url.encode("ascii")
  115. agent = client.Agent(reactor)
  116. d = agent.request(b"GET", url)
  117. d.addCallback(discardBody)
  118. def checkResponse(response):
  119. self.assertNotIn('monkeys',
  120. response.headers.getRawHeaders('server'))
  121. self.assertNotIn('last year',
  122. response.headers.getRawHeaders('date'))
  123. d.addCallback(checkResponse)
  124. return d
  125. def test_noDuplicateContentTypeHeaders(self):
  126. """
  127. If the CGI script emits a I{content-type} header, make sure that the
  128. server doesn't add an additional (duplicate) one, as per ticket 4786.
  129. """
  130. cgiFilename = self.writeCGI(NO_DUPLICATE_CONTENT_TYPE_HEADER_CGI)
  131. portnum = self.startServer(cgiFilename)
  132. url = "http://localhost:%d/cgi" % (portnum,)
  133. url = url.encode("ascii")
  134. agent = client.Agent(reactor)
  135. d = agent.request(b"GET", url)
  136. d.addCallback(discardBody)
  137. def checkResponse(response):
  138. self.assertEqual(
  139. response.headers.getRawHeaders('content-type'),
  140. ['text/cgi-duplicate-test'])
  141. return response
  142. d.addCallback(checkResponse)
  143. return d
  144. def test_noProxyPassthrough(self):
  145. """
  146. The CGI script is never called with the Proxy header passed through.
  147. """
  148. cgiFilename = self.writeCGI(HEADER_OUTPUT_CGI)
  149. portnum = self.startServer(cgiFilename)
  150. url = "http://localhost:%d/cgi" % (portnum,)
  151. url = url.encode("ascii")
  152. agent = client.Agent(reactor)
  153. headers = http_headers.Headers({b"Proxy": [b"foo"],
  154. b"X-Innocent-Header": [b"bar"]})
  155. d = agent.request(b"GET", url, headers=headers)
  156. def checkResponse(response):
  157. headers = json.loads(response.decode("ascii"))
  158. self.assertEqual(
  159. set(headers.keys()),
  160. {"HTTP_HOST", "HTTP_CONNECTION", "HTTP_X_INNOCENT_HEADER"})
  161. d.addCallback(client.readBody)
  162. d.addCallback(checkResponse)
  163. return d
  164. def test_duplicateHeaderCGI(self):
  165. """
  166. If a CGI script emits two instances of the same header, both are sent
  167. in the response.
  168. """
  169. cgiFilename = self.writeCGI(DUAL_HEADER_CGI)
  170. portnum = self.startServer(cgiFilename)
  171. url = "http://localhost:%d/cgi" % (portnum,)
  172. url = url.encode("ascii")
  173. agent = client.Agent(reactor)
  174. d = agent.request(b"GET", url)
  175. d.addCallback(discardBody)
  176. def checkResponse(response):
  177. self.assertEqual(
  178. response.headers.getRawHeaders('header'), ['spam', 'eggs'])
  179. d.addCallback(checkResponse)
  180. return d
  181. def test_malformedHeaderCGI(self):
  182. """
  183. Check for the error message in the duplicated header
  184. """
  185. cgiFilename = self.writeCGI(BROKEN_HEADER_CGI)
  186. portnum = self.startServer(cgiFilename)
  187. url = "http://localhost:%d/cgi" % (portnum,)
  188. url = url.encode("ascii")
  189. agent = client.Agent(reactor)
  190. d = agent.request(b"GET", url)
  191. d.addCallback(discardBody)
  192. loggedMessages = []
  193. def addMessage(eventDict):
  194. loggedMessages.append(log.textFromEventDict(eventDict))
  195. log.addObserver(addMessage)
  196. self.addCleanup(log.removeObserver, addMessage)
  197. def checkResponse(ignored):
  198. self.assertIn("ignoring malformed CGI header: " + repr(b'XYZ'),
  199. loggedMessages)
  200. d.addCallback(checkResponse)
  201. return d
  202. def test_ReadEmptyInput(self):
  203. cgiFilename = os.path.abspath(self.mktemp())
  204. with open(cgiFilename, 'wt') as cgiFile:
  205. cgiFile.write(READINPUT_CGI)
  206. portnum = self.startServer(cgiFilename)
  207. agent = client.Agent(reactor)
  208. url = "http://localhost:%d/cgi" % (portnum,)
  209. url = url.encode("ascii")
  210. d = agent.request(b"GET", url)
  211. d.addCallback(client.readBody)
  212. d.addCallback(self._test_ReadEmptyInput_1)
  213. return d
  214. test_ReadEmptyInput.timeout = 5
  215. def _test_ReadEmptyInput_1(self, res):
  216. expected = "readinput ok{}".format(os.linesep)
  217. expected = expected.encode("ascii")
  218. self.assertEqual(res, expected)
  219. def test_ReadInput(self):
  220. cgiFilename = os.path.abspath(self.mktemp())
  221. with open(cgiFilename, 'wt') as cgiFile:
  222. cgiFile.write(READINPUT_CGI)
  223. portnum = self.startServer(cgiFilename)
  224. agent = client.Agent(reactor)
  225. url = "http://localhost:%d/cgi" % (portnum,)
  226. url = url.encode("ascii")
  227. d = agent.request(
  228. uri=url,
  229. method=b"POST",
  230. bodyProducer=client.FileBodyProducer(
  231. BytesIO(b"Here is your stdin")),
  232. )
  233. d.addCallback(client.readBody)
  234. d.addCallback(self._test_ReadInput_1)
  235. return d
  236. test_ReadInput.timeout = 5
  237. def _test_ReadInput_1(self, res):
  238. expected = "readinput ok{}".format(os.linesep)
  239. expected = expected.encode("ascii")
  240. self.assertEqual(res, expected)
  241. def test_ReadAllInput(self):
  242. cgiFilename = os.path.abspath(self.mktemp())
  243. with open(cgiFilename, 'wt') as cgiFile:
  244. cgiFile.write(READALLINPUT_CGI)
  245. portnum = self.startServer(cgiFilename)
  246. url = "http://localhost:%d/cgi" % (portnum,)
  247. url = url.encode("ascii")
  248. d = client.Agent(reactor).request(
  249. uri=url,
  250. method=b"POST",
  251. bodyProducer=client.FileBodyProducer(
  252. BytesIO(b"Here is your stdin")),
  253. )
  254. d.addCallback(client.readBody)
  255. d.addCallback(self._test_ReadAllInput_1)
  256. return d
  257. test_ReadAllInput.timeout = 5
  258. def _test_ReadAllInput_1(self, res):
  259. expected = "readallinput ok{}".format(os.linesep)
  260. expected = expected.encode("ascii")
  261. self.assertEqual(res, expected)
  262. def test_useReactorArgument(self):
  263. """
  264. L{twcgi.FilteredScript.runProcess} uses the reactor passed as an
  265. argument to the constructor.
  266. """
  267. class FakeReactor:
  268. """
  269. A fake reactor recording whether spawnProcess is called.
  270. """
  271. called = False
  272. def spawnProcess(self, *args, **kwargs):
  273. """
  274. Set the C{called} flag to C{True} if C{spawnProcess} is called.
  275. @param args: Positional arguments.
  276. @param kwargs: Keyword arguments.
  277. """
  278. self.called = True
  279. fakeReactor = FakeReactor()
  280. request = DummyRequest(['a', 'b'])
  281. resource = twcgi.FilteredScript("dummy-file", reactor=fakeReactor)
  282. _render(resource, request)
  283. self.assertTrue(fakeReactor.called)
  284. class CGIScriptTests(unittest.TestCase):
  285. """
  286. Tests for L{twcgi.CGIScript}.
  287. """
  288. def test_pathInfo(self):
  289. """
  290. L{twcgi.CGIScript.render} sets the process environment
  291. I{PATH_INFO} from the request path.
  292. """
  293. class FakeReactor:
  294. """
  295. A fake reactor recording the environment passed to spawnProcess.
  296. """
  297. def spawnProcess(self, process, filename, args, env, wdir):
  298. """
  299. Store the C{env} L{dict} to an instance attribute.
  300. @param process: Ignored
  301. @param filename: Ignored
  302. @param args: Ignored
  303. @param env: The environment L{dict} which will be stored
  304. @param wdir: Ignored
  305. """
  306. self.process_env = env
  307. _reactor = FakeReactor()
  308. resource = twcgi.CGIScript(self.mktemp(), reactor=_reactor)
  309. request = DummyRequest(['a', 'b'])
  310. _render(resource, request)
  311. self.assertEqual(_reactor.process_env["PATH_INFO"],
  312. "/a/b")
  313. class CGIDirectoryTests(unittest.TestCase):
  314. """
  315. Tests for L{twcgi.CGIDirectory}.
  316. """
  317. def test_render(self):
  318. """
  319. L{twcgi.CGIDirectory.render} sets the HTTP response code to I{NOT
  320. FOUND}.
  321. """
  322. resource = twcgi.CGIDirectory(self.mktemp())
  323. request = DummyRequest([''])
  324. d = _render(resource, request)
  325. def cbRendered(ignored):
  326. self.assertEqual(request.responseCode, NOT_FOUND)
  327. d.addCallback(cbRendered)
  328. return d
  329. def test_notFoundChild(self):
  330. """
  331. L{twcgi.CGIDirectory.getChild} returns a resource which renders an
  332. response with the HTTP I{NOT FOUND} status code if the indicated child
  333. does not exist as an entry in the directory used to initialized the
  334. L{twcgi.CGIDirectory}.
  335. """
  336. path = self.mktemp()
  337. os.makedirs(path)
  338. resource = twcgi.CGIDirectory(path)
  339. request = DummyRequest(['foo'])
  340. child = resource.getChild("foo", request)
  341. d = _render(child, request)
  342. def cbRendered(ignored):
  343. self.assertEqual(request.responseCode, NOT_FOUND)
  344. d.addCallback(cbRendered)
  345. return d
  346. class CGIProcessProtocolTests(unittest.TestCase):
  347. """
  348. Tests for L{twcgi.CGIProcessProtocol}.
  349. """
  350. def test_prematureEndOfHeaders(self):
  351. """
  352. If the process communicating with L{CGIProcessProtocol} ends before
  353. finishing writing out headers, the response has I{INTERNAL SERVER
  354. ERROR} as its status code.
  355. """
  356. request = DummyRequest([''])
  357. protocol = twcgi.CGIProcessProtocol(request)
  358. protocol.processEnded(failure.Failure(error.ProcessTerminated()))
  359. self.assertEqual(request.responseCode, INTERNAL_SERVER_ERROR)
  360. def discardBody(response):
  361. """
  362. Discard the body of a HTTP response.
  363. @param response: The response.
  364. @return: The response.
  365. """
  366. return client.readBody(response).addCallback(lambda _: response)