test_http2.py 86 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test HTTP/2 support.
  5. """
  6. from __future__ import absolute_import, division
  7. import itertools
  8. from twisted.internet import defer, reactor, task
  9. from twisted.python.compat import iterbytes
  10. from twisted.test.proto_helpers import StringTransport
  11. from twisted.test.test_internet import DummyProducer
  12. from twisted.trial import unittest
  13. from twisted.web import http
  14. from twisted.web.test.test_http import (
  15. DummyHTTPHandler, DummyHTTPHandlerProxy,
  16. DelayedHTTPHandlerProxy,
  17. DummyPullProducerHandlerProxy,
  18. _makeRequestProxyFactory,
  19. )
  20. from twisted.internet.address import IPv4Address
  21. skipH2 = None
  22. try:
  23. from twisted.web._http2 import H2Connection
  24. # These third-party imports are guaranteed to be present if HTTP/2 support
  25. # is compiled in. We do not use them in the main code: only in the tests.
  26. import h2
  27. import h2.errors
  28. import hyperframe
  29. import priority
  30. from hpack.hpack import Encoder, Decoder
  31. except ImportError:
  32. skipH2 = "HTTP/2 support not enabled"
  33. # Define some helpers for the rest of these tests.
  34. class FrameFactory(object):
  35. """
  36. A class containing lots of helper methods and state to build frames. This
  37. allows test cases to easily build correct HTTP/2 frames to feed to
  38. hyper-h2.
  39. """
  40. def __init__(self):
  41. self.encoder = Encoder()
  42. def refreshEncoder(self):
  43. self.encoder = Encoder()
  44. def clientConnectionPreface(self):
  45. return b'PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n'
  46. def buildHeadersFrame(self,
  47. headers,
  48. flags=[],
  49. streamID=1,
  50. **priorityKwargs):
  51. """
  52. Builds a single valid headers frame out of the contained headers.
  53. """
  54. f = hyperframe.frame.HeadersFrame(streamID)
  55. f.data = self.encoder.encode(headers)
  56. f.flags.add('END_HEADERS')
  57. for flag in flags:
  58. f.flags.add(flag)
  59. for k, v in priorityKwargs.items():
  60. setattr(f, k, v)
  61. return f
  62. def buildDataFrame(self, data, flags=None, streamID=1):
  63. """
  64. Builds a single data frame out of a chunk of data.
  65. """
  66. flags = set(flags) if flags is not None else set()
  67. f = hyperframe.frame.DataFrame(streamID)
  68. f.data = data
  69. f.flags = flags
  70. return f
  71. def buildSettingsFrame(self, settings, ack=False):
  72. """
  73. Builds a single settings frame.
  74. """
  75. f = hyperframe.frame.SettingsFrame(0)
  76. if ack:
  77. f.flags.add('ACK')
  78. f.settings = settings
  79. return f
  80. def buildWindowUpdateFrame(self, streamID, increment):
  81. """
  82. Builds a single WindowUpdate frame.
  83. """
  84. f = hyperframe.frame.WindowUpdateFrame(streamID)
  85. f.window_increment = increment
  86. return f
  87. def buildGoAwayFrame(self, lastStreamID, errorCode=0, additionalData=b''):
  88. """
  89. Builds a single GOAWAY frame.
  90. """
  91. f = hyperframe.frame.GoAwayFrame(0)
  92. f.error_code = errorCode
  93. f.last_stream_id = lastStreamID
  94. f.additional_data = additionalData
  95. return f
  96. def buildRstStreamFrame(self, streamID, errorCode=0):
  97. """
  98. Builds a single RST_STREAM frame.
  99. """
  100. f = hyperframe.frame.RstStreamFrame(streamID)
  101. f.error_code = errorCode
  102. return f
  103. def buildPriorityFrame(self,
  104. streamID,
  105. weight,
  106. dependsOn=0,
  107. exclusive=False):
  108. """
  109. Builds a single priority frame.
  110. """
  111. f = hyperframe.frame.PriorityFrame(streamID)
  112. f.depends_on = dependsOn
  113. f.stream_weight = weight
  114. f.exclusive = exclusive
  115. return f
  116. def buildPushPromiseFrame(self,
  117. streamID,
  118. promisedStreamID,
  119. headers,
  120. flags=[]):
  121. """
  122. Builds a single Push Promise frame.
  123. """
  124. f = hyperframe.frame.PushPromiseFrame(streamID)
  125. f.promised_stream_id = promisedStreamID
  126. f.data = self.encoder.encode(headers)
  127. f.flags = set(flags)
  128. f.flags.add('END_HEADERS')
  129. return f
  130. class FrameBuffer(object):
  131. """
  132. A test object that converts data received from Twisted's HTTP/2 stack and
  133. turns it into a sequence of hyperframe frame objects.
  134. This is primarily used to make it easier to write and debug tests: rather
  135. than have to serialize the expected frames and then do byte-level
  136. comparison (which can be unclear in debugging output), this object makes it
  137. possible to work with the frames directly.
  138. It also ensures that headers are properly decompressed.
  139. """
  140. def __init__(self):
  141. self.decoder = Decoder()
  142. self._data = b''
  143. def receiveData(self, data):
  144. self._data += data
  145. def __iter__(self):
  146. return self
  147. def next(self):
  148. if len(self._data) < 9:
  149. raise StopIteration()
  150. frame, length = hyperframe.frame.Frame.parse_frame_header(
  151. self._data[:9]
  152. )
  153. if len(self._data) < length + 9:
  154. raise StopIteration()
  155. frame.parse_body(memoryview(self._data[9:9+length]))
  156. self._data = self._data[9+length:]
  157. if isinstance(frame, hyperframe.frame.HeadersFrame):
  158. frame.data = self.decoder.decode(frame.data, raw=True)
  159. return frame
  160. __next__ = next
  161. def buildRequestFrames(headers, data, frameFactory=None, streamID=1):
  162. """
  163. Provides a sequence of HTTP/2 frames that encode a single HTTP request.
  164. This should be used when you want to control the serialization yourself,
  165. e.g. because you want to interleave other frames with these. If that's not
  166. necessary, prefer L{buildRequestBytes}.
  167. @param headers: The HTTP/2 headers to send.
  168. @type headers: L{list} of L{tuple} of L{bytes}
  169. @param data: The HTTP data to send. Each list entry will be sent in its own
  170. frame.
  171. @type data: L{list} of L{bytes}
  172. @param frameFactory: The L{FrameFactory} that will be used to construct the
  173. frames.
  174. @type frameFactory: L{FrameFactory}
  175. @param streamID: The ID of the stream on which to send the request.
  176. @type streamID: L{int}
  177. """
  178. if frameFactory is None:
  179. frameFactory = FrameFactory()
  180. frames = []
  181. frames.append(
  182. frameFactory.buildHeadersFrame(headers=headers, streamID=streamID)
  183. )
  184. frames.extend(
  185. frameFactory.buildDataFrame(chunk, streamID=streamID) for chunk in data
  186. )
  187. frames[-1].flags.add('END_STREAM')
  188. return frames
  189. def buildRequestBytes(headers, data, frameFactory=None, streamID=1):
  190. """
  191. Provides the byte sequence for a collection of HTTP/2 frames representing
  192. the provided request.
  193. @param headers: The HTTP/2 headers to send.
  194. @type headers: L{list} of L{tuple} of L{bytes}
  195. @param data: The HTTP data to send. Each list entry will be sent in its own
  196. frame.
  197. @type data: L{list} of L{bytes}
  198. @param frameFactory: The L{FrameFactory} that will be used to construct the
  199. frames.
  200. @type frameFactory: L{FrameFactory}
  201. @param streamID: The ID of the stream on which to send the request.
  202. @type streamID: L{int}
  203. """
  204. frames = buildRequestFrames(headers, data, frameFactory, streamID)
  205. return b''.join(f.serialize() for f in frames)
  206. def framesFromBytes(data):
  207. """
  208. Given a sequence of bytes, decodes them into frames.
  209. Note that this method should almost always be called only once, before
  210. making some assertions. This is because decoding HTTP/2 frames is extremely
  211. stateful, and this function doesn't preserve any of that state between
  212. calls.
  213. @param data: The serialized HTTP/2 frames.
  214. @type data: L{bytes}
  215. @returns: A list of HTTP/2 frames.
  216. @rtype: L{list} of L{hyperframe.frame.Frame} subclasses.
  217. """
  218. buffer = FrameBuffer()
  219. buffer.receiveData(data)
  220. return list(buffer)
  221. class ChunkedHTTPHandler(http.Request):
  222. """
  223. A HTTP request object that writes chunks of data back to the network based
  224. on the URL.
  225. Must be called with a path /chunked/<num_chunks>
  226. """
  227. chunkData = b'hello world!'
  228. def process(self):
  229. chunks = int(self.uri.split(b'/')[-1])
  230. self.setResponseCode(200)
  231. for _ in range(chunks):
  232. self.write(self.chunkData)
  233. self.finish()
  234. ChunkedHTTPHandlerProxy = _makeRequestProxyFactory(ChunkedHTTPHandler)
  235. class ConsumerDummyHandler(http.Request):
  236. """
  237. This is a HTTP request handler that works with the C{IPushProducer}
  238. implementation in the L{H2Stream} object. No current IRequest object does
  239. that, but in principle future implementations could: that codepath should
  240. therefore be tested.
  241. """
  242. def __init__(self, *args, **kwargs):
  243. http.Request.__init__(self, *args, **kwargs)
  244. # Production starts paused.
  245. self.channel.pauseProducing()
  246. self._requestReceived = False
  247. self._data = None
  248. def acceptData(self):
  249. """
  250. Start the data pipe.
  251. """
  252. self.channel.resumeProducing()
  253. def requestReceived(self, *args, **kwargs):
  254. self._requestReceived = True
  255. return http.Request.requestReceived(self, *args, **kwargs)
  256. def process(self):
  257. self.setResponseCode(200)
  258. self._data = self.content.read()
  259. returnData = b'this is a response from a consumer dummy handler'
  260. self.write(returnData)
  261. self.finish()
  262. ConsumerDummyHandlerProxy = _makeRequestProxyFactory(ConsumerDummyHandler)
  263. class AbortingConsumerDummyHandler(ConsumerDummyHandler):
  264. """
  265. This is a HTTP request handler that works with the C{IPushProducer}
  266. implementation in the L{H2Stream} object. The difference between this and
  267. the ConsumerDummyHandler is that after resuming production it immediately
  268. aborts it again.
  269. """
  270. def acceptData(self):
  271. """
  272. Start and then immediately stop the data pipe.
  273. """
  274. self.channel.resumeProducing()
  275. self.channel.stopProducing()
  276. AbortingConsumerDummyHandlerProxy = _makeRequestProxyFactory(
  277. AbortingConsumerDummyHandler)
  278. class DummyProducerHandler(http.Request):
  279. """
  280. An HTTP request handler that registers a dummy producer to serve the body.
  281. The owner must call C{finish} to complete the response.
  282. """
  283. def process(self):
  284. self.setResponseCode(200)
  285. self.registerProducer(DummyProducer(), True)
  286. DummyProducerHandlerProxy = _makeRequestProxyFactory(DummyProducerHandler)
  287. class HTTP2TestHelpers(object):
  288. """
  289. A superclass that contains no tests but provides test helpers for HTTP/2
  290. tests.
  291. """
  292. if skipH2:
  293. skip = skipH2
  294. def assertAllStreamsBlocked(self, connection):
  295. """
  296. Confirm that all streams are blocked: that is, the priority tree
  297. believes that none of the streams have data ready to send.
  298. """
  299. self.assertRaises(priority.DeadlockError, next, connection.priority)
  300. class HTTP2ServerTests(unittest.TestCase, HTTP2TestHelpers):
  301. getRequestHeaders = [
  302. (b':method', b'GET'),
  303. (b':authority', b'localhost'),
  304. (b':path', b'/'),
  305. (b':scheme', b'https'),
  306. (b'user-agent', b'twisted-test-code'),
  307. (b'custom-header', b'1'),
  308. (b'custom-header', b'2'),
  309. ]
  310. postRequestHeaders = [
  311. (b':method', b'POST'),
  312. (b':authority', b'localhost'),
  313. (b':path', b'/post_endpoint'),
  314. (b':scheme', b'https'),
  315. (b'user-agent', b'twisted-test-code'),
  316. (b'content-length', b'25'),
  317. ]
  318. postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"]
  319. getResponseHeaders = [
  320. (b':status', b'200'),
  321. (b'request', b'/'),
  322. (b'command', b'GET'),
  323. (b'version', b'HTTP/2'),
  324. (b'content-length', b'13'),
  325. ]
  326. getResponseData = b"'''\nNone\n'''\n"
  327. postResponseHeaders = [
  328. (b':status', b'200'),
  329. (b'request', b'/post_endpoint'),
  330. (b'command', b'POST'),
  331. (b'version', b'HTTP/2'),
  332. (b'content-length', b'36'),
  333. ]
  334. postResponseData = b"'''\n25\nhello world, it's http/2!'''\n"
  335. def connectAndReceive(self, connection, headers, body):
  336. """
  337. Takes a single L{H2Connection} object and connects it to a
  338. L{StringTransport} using a brand new L{FrameFactory}.
  339. @param connection: The L{H2Connection} object to connect.
  340. @type connection: L{H2Connection}
  341. @param headers: The headers to send on the first request.
  342. @type headers: L{Iterable} of L{tuple} of C{(bytes, bytes)}
  343. @param body: Chunks of body to send, if any.
  344. @type body: L{Iterable} of L{bytes}
  345. @return: A tuple of L{FrameFactory}, L{StringTransport}
  346. """
  347. frameFactory = FrameFactory()
  348. transport = StringTransport()
  349. requestBytes = frameFactory.clientConnectionPreface()
  350. requestBytes += buildRequestBytes(headers, body, frameFactory)
  351. connection.makeConnection(transport)
  352. # One byte at a time, to stress the implementation.
  353. for byte in iterbytes(requestBytes):
  354. connection.dataReceived(byte)
  355. return frameFactory, transport
  356. def test_basicRequest(self):
  357. """
  358. Send request over a TCP connection and confirm that we get back the
  359. expected data in the order and style we expect.
  360. """
  361. # This test is complex because it validates the data very closely: it
  362. # specifically checks frame ordering and type.
  363. connection = H2Connection()
  364. connection.requestFactory = DummyHTTPHandlerProxy
  365. _, transport = self.connectAndReceive(
  366. connection, self.getRequestHeaders, []
  367. )
  368. def validate(streamID):
  369. frames = framesFromBytes(transport.value())
  370. self.assertEqual(len(frames), 4)
  371. self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
  372. self.assertTrue(
  373. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  374. )
  375. self.assertTrue(isinstance(frames[2], hyperframe.frame.DataFrame))
  376. self.assertTrue(isinstance(frames[3], hyperframe.frame.DataFrame))
  377. self.assertEqual(
  378. dict(frames[1].data), dict(self.getResponseHeaders)
  379. )
  380. self.assertEqual(frames[2].data, self.getResponseData)
  381. self.assertEqual(frames[3].data, b'')
  382. self.assertTrue('END_STREAM' in frames[3].flags)
  383. return connection._streamCleanupCallbacks[1].addCallback(validate)
  384. def test_postRequest(self):
  385. """
  386. Send a POST request and confirm that the data is safely transferred.
  387. """
  388. connection = H2Connection()
  389. connection.requestFactory = DummyHTTPHandlerProxy
  390. _, transport = self.connectAndReceive(
  391. connection, self.postRequestHeaders, self.postRequestData
  392. )
  393. def validate(streamID):
  394. frames = framesFromBytes(transport.value())
  395. # One Settings frame, one Headers frame and two Data frames.
  396. self.assertEqual(len(frames), 4)
  397. self.assertTrue(all(f.stream_id == 1 for f in frames[-3:]))
  398. self.assertTrue(
  399. isinstance(frames[-3], hyperframe.frame.HeadersFrame)
  400. )
  401. self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame))
  402. self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame))
  403. self.assertEqual(
  404. dict(frames[-3].data), dict(self.postResponseHeaders)
  405. )
  406. self.assertEqual(frames[-2].data, self.postResponseData)
  407. self.assertEqual(frames[-1].data, b'')
  408. self.assertTrue('END_STREAM' in frames[-1].flags)
  409. return connection._streamCleanupCallbacks[1].addCallback(validate)
  410. def test_postRequestNoLength(self):
  411. """
  412. Send a POST request without length and confirm that the data is safely
  413. transferred.
  414. """
  415. postResponseHeaders = [
  416. (b':status', b'200'),
  417. (b'request', b'/post_endpoint'),
  418. (b'command', b'POST'),
  419. (b'version', b'HTTP/2'),
  420. (b'content-length', b'38'),
  421. ]
  422. postResponseData = b"'''\nNone\nhello world, it's http/2!'''\n"
  423. # Strip the content-length header.
  424. postRequestHeaders = [
  425. (x, y) for x, y in self.postRequestHeaders
  426. if x != b'content-length'
  427. ]
  428. connection = H2Connection()
  429. connection.requestFactory = DummyHTTPHandlerProxy
  430. _, transport = self.connectAndReceive(
  431. connection, postRequestHeaders, self.postRequestData
  432. )
  433. def validate(streamID):
  434. frames = framesFromBytes(transport.value())
  435. # One Settings frame, one Headers frame, and two Data frames
  436. self.assertEqual(len(frames), 4)
  437. self.assertTrue(all(f.stream_id == 1 for f in frames[-3:]))
  438. self.assertTrue(
  439. isinstance(frames[-3], hyperframe.frame.HeadersFrame)
  440. )
  441. self.assertTrue(isinstance(frames[-2], hyperframe.frame.DataFrame))
  442. self.assertTrue(isinstance(frames[-1], hyperframe.frame.DataFrame))
  443. self.assertEqual(
  444. dict(frames[-3].data), dict(postResponseHeaders)
  445. )
  446. self.assertEqual(frames[-2].data, postResponseData)
  447. self.assertEqual(frames[-1].data, b'')
  448. self.assertTrue('END_STREAM' in frames[-1].flags)
  449. return connection._streamCleanupCallbacks[1].addCallback(validate)
  450. def test_interleavedRequests(self):
  451. """
  452. Many interleaved POST requests all get received and responded to
  453. appropriately.
  454. """
  455. # Unfortunately this test is pretty complex.
  456. REQUEST_COUNT = 40
  457. f = FrameFactory()
  458. b = StringTransport()
  459. a = H2Connection()
  460. a.requestFactory = DummyHTTPHandlerProxy
  461. # Stream IDs are always odd numbers.
  462. streamIDs = list(range(1, REQUEST_COUNT * 2, 2))
  463. frames = [
  464. buildRequestFrames(
  465. self.postRequestHeaders, self.postRequestData, f, streamID
  466. ) for streamID in streamIDs
  467. ]
  468. requestBytes = f.clientConnectionPreface()
  469. # Interleave the frames. That is, send one frame from each stream at a
  470. # time. This wacky line lets us do that.
  471. frames = itertools.chain.from_iterable(zip(*frames))
  472. requestBytes += b''.join(frame.serialize() for frame in frames)
  473. a.makeConnection(b)
  474. # one byte at a time, to stress the implementation.
  475. for byte in iterbytes(requestBytes):
  476. a.dataReceived(byte)
  477. def validate(results):
  478. frames = framesFromBytes(b.value())
  479. # We expect 1 Settings frame for the connection, and then 3 frames
  480. # *per stream* (1 Headers frame, 2 Data frames). This doesn't send
  481. # enough data to trigger a window update.
  482. self.assertEqual(len(frames), 1 + (3 * 40))
  483. # Let's check the data is ok. We need the non-WindowUpdate frames
  484. # for each stream.
  485. for streamID in streamIDs:
  486. streamFrames = [
  487. f for f in frames if f.stream_id == streamID and
  488. not isinstance(f, hyperframe.frame.WindowUpdateFrame)
  489. ]
  490. self.assertEqual(len(streamFrames), 3)
  491. self.assertEqual(
  492. dict(streamFrames[0].data), dict(self.postResponseHeaders)
  493. )
  494. self.assertEqual(streamFrames[1].data, self.postResponseData)
  495. self.assertEqual(streamFrames[2].data, b'')
  496. self.assertTrue('END_STREAM' in streamFrames[2].flags)
  497. return defer.DeferredList(
  498. list(a._streamCleanupCallbacks.values())
  499. ).addCallback(validate)
  500. def test_sendAccordingToPriority(self):
  501. """
  502. Data in responses is interleaved according to HTTP/2 priorities.
  503. """
  504. # We want to start three parallel GET requests that will each return
  505. # four chunks of data. These chunks will be interleaved according to
  506. # HTTP/2 priorities. Stream 1 will be set to weight 64, Stream 3 to
  507. # weight 32, and Stream 5 to weight 16 but dependent on Stream 1.
  508. # That will cause data frames for these streams to be emitted in this
  509. # order: 1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5.
  510. #
  511. # The reason there are so many frames is because the implementation
  512. # interleaves stream completion according to priority order as well,
  513. # because it is sent on a Data frame.
  514. #
  515. # This doesn't fully test priority, but tests *almost* enough of it to
  516. # be worthwhile.
  517. f = FrameFactory()
  518. b = StringTransport()
  519. a = H2Connection()
  520. a.requestFactory = ChunkedHTTPHandlerProxy
  521. getRequestHeaders = self.getRequestHeaders
  522. getRequestHeaders[2] = (':path', '/chunked/4')
  523. frames = [
  524. buildRequestFrames(getRequestHeaders, [], f, streamID)
  525. for streamID in [1, 3, 5]
  526. ]
  527. # Set the priorities. The first two will use their HEADERS frame, the
  528. # third will have a PRIORITY frame sent before the headers.
  529. frames[0][0].flags.add('PRIORITY')
  530. frames[0][0].stream_weight = 64
  531. frames[1][0].flags.add('PRIORITY')
  532. frames[1][0].stream_weight = 32
  533. priorityFrame = f.buildPriorityFrame(
  534. streamID=5,
  535. weight=16,
  536. dependsOn=1,
  537. exclusive=True,
  538. )
  539. frames[2].insert(0, priorityFrame)
  540. frames = itertools.chain.from_iterable(frames)
  541. requestBytes = f.clientConnectionPreface()
  542. requestBytes += b''.join(frame.serialize() for frame in frames)
  543. a.makeConnection(b)
  544. # one byte at a time, to stress the implementation.
  545. for byte in iterbytes(requestBytes):
  546. a.dataReceived(byte)
  547. def validate(results):
  548. frames = framesFromBytes(b.value())
  549. # We expect 1 Settings frame for the connection, and then 6 frames
  550. # per stream (1 Headers frame, 5 data frames), for a total of 19.
  551. self.assertEqual(len(frames), 19)
  552. streamIDs = [
  553. f.stream_id for f in frames
  554. if isinstance(f, hyperframe.frame.DataFrame)
  555. ]
  556. expectedOrder = [1, 3, 1, 1, 3, 1, 1, 3, 5, 3, 5, 3, 5, 5, 5]
  557. self.assertEqual(streamIDs, expectedOrder)
  558. return defer.DeferredList(
  559. list(a._streamCleanupCallbacks.values())
  560. ).addCallback(validate)
  561. def test_protocolErrorTerminatesConnection(self):
  562. """
  563. A protocol error from the remote peer terminates the connection.
  564. """
  565. f = FrameFactory()
  566. b = StringTransport()
  567. a = H2Connection()
  568. a.requestFactory = DummyHTTPHandlerProxy
  569. # We're going to open a stream and then send a PUSH_PROMISE frame,
  570. # which is forbidden.
  571. requestBytes = f.clientConnectionPreface()
  572. requestBytes += buildRequestBytes(self.getRequestHeaders, [], f)
  573. requestBytes += f.buildPushPromiseFrame(
  574. streamID=1,
  575. promisedStreamID=2,
  576. headers=self.getRequestHeaders,
  577. flags=['END_HEADERS'],
  578. ).serialize()
  579. a.makeConnection(b)
  580. # one byte at a time, to stress the implementation.
  581. for byte in iterbytes(requestBytes):
  582. a.dataReceived(byte)
  583. # Check whether the transport got shut down: if it did, stop
  584. # sending more data.
  585. if b.disconnecting:
  586. break
  587. frames = framesFromBytes(b.value())
  588. # The send loop never gets to terminate the stream, but *some* data
  589. # does get sent. We get a Settings frame, a Headers frame, and then the
  590. # GoAway frame.
  591. self.assertEqual(len(frames), 3)
  592. self.assertTrue(
  593. isinstance(frames[-1], hyperframe.frame.GoAwayFrame)
  594. )
  595. self.assertTrue(b.disconnecting)
  596. def test_streamProducingData(self):
  597. """
  598. The H2Stream data implements IPushProducer, and can have its data
  599. production controlled by the Request if the Request chooses to.
  600. """
  601. connection = H2Connection()
  602. connection.requestFactory = ConsumerDummyHandlerProxy
  603. _, transport = self.connectAndReceive(
  604. connection, self.postRequestHeaders, self.postRequestData
  605. )
  606. # At this point no data should have been received by the request *or*
  607. # the response. We need to dig the request out of the tree of objects.
  608. request = connection.streams[1]._request.original
  609. self.assertFalse(request._requestReceived)
  610. # We should have only received the Settings frame. It's important that
  611. # the WindowUpdate frames don't land before data is delivered to the
  612. # Request.
  613. frames = framesFromBytes(transport.value())
  614. self.assertEqual(len(frames), 1)
  615. # At this point, we can kick off the producing. This will force the
  616. # H2Stream object to deliver the request data all at once, so check
  617. # that it was delivered correctly.
  618. request.acceptData()
  619. self.assertTrue(request._requestReceived)
  620. self.assertTrue(request._data, b"hello world, it's http/2!")
  621. # *That* will have also caused the H2Connection object to emit almost
  622. # all the data it needs. That'll be a Headers frame, as well as the
  623. # original SETTINGS frame.
  624. frames = framesFromBytes(transport.value())
  625. self.assertEqual(len(frames), 2)
  626. def validate(streamID):
  627. # Confirm that the response is ok.
  628. frames = framesFromBytes(transport.value())
  629. # The only new frames here are the two Data frames.
  630. self.assertEqual(len(frames), 4)
  631. self.assertTrue('END_STREAM' in frames[-1].flags)
  632. return connection._streamCleanupCallbacks[1].addCallback(validate)
  633. def test_abortStreamProducingData(self):
  634. """
  635. The H2Stream data implements IPushProducer, and can have its data
  636. production controlled by the Request if the Request chooses to.
  637. When the production is stopped, that causes the stream connection to
  638. be lost.
  639. """
  640. f = FrameFactory()
  641. b = StringTransport()
  642. a = H2Connection()
  643. a.requestFactory = AbortingConsumerDummyHandlerProxy
  644. # We're going to send in a POST request.
  645. frames = buildRequestFrames(
  646. self.postRequestHeaders, self.postRequestData, f
  647. )
  648. frames[-1].flags = set() # Remove END_STREAM flag.
  649. requestBytes = f.clientConnectionPreface()
  650. requestBytes += b''.join(f.serialize() for f in frames)
  651. a.makeConnection(b)
  652. # one byte at a time, to stress the implementation.
  653. for byte in iterbytes(requestBytes):
  654. a.dataReceived(byte)
  655. # At this point no data should have been received by the request *or*
  656. # the response. We need to dig the request out of the tree of objects.
  657. request = a.streams[1]._request.original
  658. self.assertFalse(request._requestReceived)
  659. # Save off the cleanup deferred now, it'll be removed when the
  660. # RstStream frame is sent.
  661. cleanupCallback = a._streamCleanupCallbacks[1]
  662. # At this point, we can kick off the production and immediate abort.
  663. request.acceptData()
  664. # The stream will now have been aborted.
  665. def validate(streamID):
  666. # Confirm that the response is ok.
  667. frames = framesFromBytes(b.value())
  668. # We expect a Settings frame and a RstStream frame.
  669. self.assertEqual(len(frames), 2)
  670. self.assertTrue(
  671. isinstance(frames[-1], hyperframe.frame.RstStreamFrame)
  672. )
  673. self.assertEqual(frames[-1].stream_id, 1)
  674. return cleanupCallback.addCallback(validate)
  675. def test_terminatedRequest(self):
  676. """
  677. When a RstStream frame is received, the L{H2Connection} and L{H2Stream}
  678. objects tear down the L{http.Request} and swallow all outstanding
  679. writes.
  680. """
  681. # Here we want to use the DummyProducerHandler primarily for the side
  682. # effect it has of not writing to the connection. That means we can
  683. # delay some writes until *after* the RstStream frame is received.
  684. connection = H2Connection()
  685. connection.requestFactory = DummyProducerHandlerProxy
  686. frameFactory, transport = self.connectAndReceive(
  687. connection, self.getRequestHeaders, []
  688. )
  689. # Get the request object.
  690. request = connection.streams[1]._request.original
  691. # Send two writes in.
  692. request.write(b"first chunk")
  693. request.write(b"second chunk")
  694. # Save off the cleanup deferred now, it'll be removed when the
  695. # RstStream frame is received.
  696. cleanupCallback = connection._streamCleanupCallbacks[1]
  697. # Now fire the RstStream frame.
  698. connection.dataReceived(
  699. frameFactory.buildRstStreamFrame(1, errorCode=1).serialize()
  700. )
  701. # This should have cancelled the request.
  702. self.assertTrue(request._disconnected)
  703. self.assertTrue(request.channel is None)
  704. # An attempt to write should at this point raise an exception.
  705. self.assertRaises(AttributeError, request.write, b"third chunk")
  706. # Check that everything is fine.
  707. # We expect that only the Settings and Headers frames will have been
  708. # emitted. The two writes are lost because the delayed call never had
  709. # another chance to execute before the RstStream frame got processed.
  710. def validate(streamID):
  711. frames = framesFromBytes(transport.value())
  712. self.assertEqual(len(frames), 2)
  713. self.assertEqual(frames[1].stream_id, 1)
  714. self.assertTrue(
  715. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  716. )
  717. return cleanupCallback.addCallback(validate)
  718. def test_terminatedConnection(self):
  719. """
  720. When a GoAway frame is received, the L{H2Connection} and L{H2Stream}
  721. objects tear down all outstanding L{http.Request} objects and stop all
  722. writing.
  723. """
  724. # Here we want to use the DummyProducerHandler primarily for the side
  725. # effect it has of not writing to the connection. That means we can
  726. # delay some writes until *after* the GoAway frame is received.
  727. connection = H2Connection()
  728. connection.requestFactory = DummyProducerHandlerProxy
  729. frameFactory, transport = self.connectAndReceive(
  730. connection, self.getRequestHeaders, []
  731. )
  732. # Get the request object.
  733. request = connection.streams[1]._request.original
  734. # Send two writes in.
  735. request.write(b"first chunk")
  736. request.write(b"second chunk")
  737. # Save off the cleanup deferred now, it'll be removed when the
  738. # GoAway frame is received.
  739. cleanupCallback = connection._streamCleanupCallbacks[1]
  740. # Now fire the GoAway frame.
  741. connection.dataReceived(
  742. frameFactory.buildGoAwayFrame(lastStreamID=0).serialize()
  743. )
  744. # This should have cancelled the request.
  745. self.assertTrue(request._disconnected)
  746. self.assertTrue(request.channel is None)
  747. # It should also have cancelled the sending loop.
  748. self.assertFalse(connection._stillProducing)
  749. # Check that everything is fine.
  750. # We expect that only the Settings and Headers frames will have been
  751. # emitted. The writes are lost because the callLater never had
  752. # a chance to execute before the GoAway frame got processed.
  753. def validate(streamID):
  754. frames = framesFromBytes(transport.value())
  755. self.assertEqual(len(frames), 2)
  756. self.assertEqual(frames[1].stream_id, 1)
  757. self.assertTrue(
  758. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  759. )
  760. return cleanupCallback.addCallback(validate)
  761. def test_respondWith100Continue(self):
  762. """
  763. Requests containing Expect: 100-continue cause provisional 100
  764. responses to be emitted.
  765. """
  766. connection = H2Connection()
  767. connection.requestFactory = DummyHTTPHandlerProxy
  768. # Add Expect: 100-continue for this request.
  769. headers = self.getRequestHeaders + [(b'expect', b'100-continue')]
  770. _, transport = self.connectAndReceive(connection, headers, [])
  771. # We expect 5 frames now: Settings, two Headers frames, and two Data
  772. # frames. We're only really interested in validating the first Headers
  773. # frame which contains the 100.
  774. def validate(streamID):
  775. frames = framesFromBytes(transport.value())
  776. self.assertEqual(len(frames), 5)
  777. self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
  778. self.assertTrue(
  779. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  780. )
  781. self.assertEqual(
  782. frames[1].data, [(b':status', b'100')]
  783. )
  784. self.assertTrue('END_STREAM' in frames[-1].flags)
  785. return connection._streamCleanupCallbacks[1].addCallback(validate)
  786. def test_respondWith400(self):
  787. """
  788. Triggering the call to L{H2Stream._respondToBadRequestAndDisconnect}
  789. leads to a 400 error being sent automatically and the stream being torn
  790. down.
  791. """
  792. # The only "natural" way to trigger this in the current codebase is to
  793. # send a multipart/form-data request that the cgi module doesn't like.
  794. # That's absurdly hard, so instead we'll just call it ourselves. For
  795. # this reason we use the DummyProducerHandler, which doesn't write the
  796. # headers straight away.
  797. connection = H2Connection()
  798. connection.requestFactory = DummyProducerHandlerProxy
  799. _, transport = self.connectAndReceive(
  800. connection, self.getRequestHeaders, []
  801. )
  802. # Grab the request and the completion callback.
  803. stream = connection.streams[1]
  804. request = stream._request.original
  805. cleanupCallback = connection._streamCleanupCallbacks[1]
  806. # Abort the stream.
  807. stream._respondToBadRequestAndDisconnect()
  808. # This should have cancelled the request.
  809. self.assertTrue(request._disconnected)
  810. self.assertTrue(request.channel is None)
  811. # We expect 2 frames Settings and the 400 Headers.
  812. def validate(streamID):
  813. frames = framesFromBytes(transport.value())
  814. self.assertEqual(len(frames), 2)
  815. self.assertTrue(
  816. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  817. )
  818. self.assertEqual(
  819. frames[1].data, [(b':status', b'400')]
  820. )
  821. self.assertTrue('END_STREAM' in frames[-1].flags)
  822. return cleanupCallback.addCallback(validate)
  823. def test_loseH2StreamConnection(self):
  824. """
  825. Calling L{Request.loseConnection} causes all data that has previously
  826. been sent to be flushed, and then the stream cleanly closed.
  827. """
  828. # Here we again want to use the DummyProducerHandler because it doesn't
  829. # close the connection on its own.
  830. connection = H2Connection()
  831. connection.requestFactory = DummyProducerHandlerProxy
  832. _, transport = self.connectAndReceive(
  833. connection, self.getRequestHeaders, []
  834. )
  835. # Grab the request.
  836. stream = connection.streams[1]
  837. request = stream._request.original
  838. # Send in some writes.
  839. dataChunks = [b'hello', b'world', b'here', b'are', b'some', b'writes']
  840. for chunk in dataChunks:
  841. request.write(chunk)
  842. # Now lose the connection.
  843. request.loseConnection()
  844. # Check that the data was all written out correctly and that the stream
  845. # state is cleaned up.
  846. def validate(streamID):
  847. frames = framesFromBytes(transport.value())
  848. # Settings, Headers, 7 Data frames.
  849. self.assertEqual(len(frames), 9)
  850. self.assertTrue(all(f.stream_id == 1 for f in frames[1:]))
  851. self.assertTrue(
  852. isinstance(frames[1], hyperframe.frame.HeadersFrame)
  853. )
  854. self.assertTrue('END_STREAM' in frames[-1].flags)
  855. receivedDataChunks = [
  856. f.data for f in frames
  857. if isinstance(f, hyperframe.frame.DataFrame)
  858. ]
  859. self.assertEqual(
  860. receivedDataChunks,
  861. dataChunks + [b""],
  862. )
  863. return connection._streamCleanupCallbacks[1].addCallback(validate)
  864. def test_cannotRegisterTwoProducers(self):
  865. """
  866. The L{H2Stream} object forbids registering two producers.
  867. """
  868. connection = H2Connection()
  869. connection.requestFactory = DummyProducerHandlerProxy
  870. self.connectAndReceive(connection, self.getRequestHeaders, [])
  871. # Grab the request.
  872. stream = connection.streams[1]
  873. request = stream._request.original
  874. self.assertRaises(ValueError, stream.registerProducer, request, True)
  875. def test_handlesPullProducer(self):
  876. """
  877. L{Request} objects that have registered pull producers get blocked and
  878. unblocked according to HTTP/2 flow control.
  879. """
  880. connection = H2Connection()
  881. connection.requestFactory = DummyPullProducerHandlerProxy
  882. _, transport = self.connectAndReceive(
  883. connection, self.getRequestHeaders, []
  884. )
  885. # Get the producer completion deferred and ensure we call
  886. # request.finish.
  887. stream = connection.streams[1]
  888. request = stream._request.original
  889. producerComplete = request._actualProducer.result
  890. producerComplete.addCallback(lambda x: request.finish())
  891. # Check that the sending loop sends all the appropriate data.
  892. def validate(streamID):
  893. frames = framesFromBytes(transport.value())
  894. # Check that the stream is correctly terminated.
  895. self.assertTrue('END_STREAM' in frames[-1].flags)
  896. # Grab the data from the frames.
  897. dataChunks = [
  898. f.data for f in frames
  899. if isinstance(f, hyperframe.frame.DataFrame)
  900. ]
  901. self.assertEqual(
  902. dataChunks,
  903. [
  904. b"0", b"1", b"2", b"3", b"4", b"5",
  905. b"6", b"7", b"8", b"9", b""
  906. ]
  907. )
  908. return connection._streamCleanupCallbacks[1].addCallback(validate)
  909. def test_isSecureWorksProperly(self):
  910. """
  911. L{Request} objects can correctly ask isSecure on HTTP/2.
  912. """
  913. connection = H2Connection()
  914. connection.requestFactory = DelayedHTTPHandlerProxy
  915. self.connectAndReceive(connection, self.getRequestHeaders, [])
  916. request = connection.streams[1]._request.original
  917. self.assertFalse(request.isSecure())
  918. connection.streams[1].abortConnection()
  919. def test_lateCompletionWorks(self):
  920. """
  921. L{H2Connection} correctly unblocks when a stream is ended.
  922. """
  923. connection = H2Connection()
  924. connection.requestFactory = DelayedHTTPHandlerProxy
  925. _, transport = self.connectAndReceive(
  926. connection, self.getRequestHeaders, []
  927. )
  928. # Delay a call to end request, forcing the connection to block because
  929. # it has no data to send.
  930. request = connection.streams[1]._request.original
  931. reactor.callLater(0.01, request.finish)
  932. def validateComplete(*args):
  933. frames = framesFromBytes(transport.value())
  934. # Check that the stream is correctly terminated.
  935. self.assertEqual(len(frames), 3)
  936. self.assertTrue('END_STREAM' in frames[-1].flags)
  937. return connection._streamCleanupCallbacks[1].addCallback(
  938. validateComplete
  939. )
  940. def test_writeSequenceForChannels(self):
  941. """
  942. L{H2Stream} objects can send a series of frames via C{writeSequence}.
  943. """
  944. connection = H2Connection()
  945. connection.requestFactory = DelayedHTTPHandlerProxy
  946. _, transport = self.connectAndReceive(
  947. connection, self.getRequestHeaders, []
  948. )
  949. stream = connection.streams[1]
  950. request = stream._request.original
  951. request.setResponseCode(200)
  952. stream.writeSequence([b'Hello', b',', b'world!'])
  953. request.finish()
  954. completionDeferred = connection._streamCleanupCallbacks[1]
  955. def validate(streamID):
  956. frames = framesFromBytes(transport.value())
  957. # Check that the stream is correctly terminated.
  958. self.assertTrue('END_STREAM' in frames[-1].flags)
  959. # Grab the data from the frames.
  960. dataChunks = [
  961. f.data for f in frames
  962. if isinstance(f, hyperframe.frame.DataFrame)
  963. ]
  964. self.assertEqual(
  965. dataChunks,
  966. [
  967. b"Hello", b",", b"world!", b""
  968. ]
  969. )
  970. return completionDeferred.addCallback(validate)
  971. def test_delayWrites(self):
  972. """
  973. Delaying writes from L{Request} causes the L{H2Connection} to block on
  974. sending until data is available. However, data is *not* sent if there's
  975. no room in the flow control window.
  976. """
  977. # Here we again want to use the DummyProducerHandler because it doesn't
  978. # close the connection on its own.
  979. f = FrameFactory()
  980. b = StringTransport()
  981. a = H2Connection()
  982. a.requestFactory = DelayedHTTPHandlerProxy
  983. requestBytes = f.clientConnectionPreface()
  984. requestBytes += f.buildSettingsFrame(
  985. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  986. ).serialize()
  987. requestBytes += buildRequestBytes(
  988. self.getRequestHeaders, [], f
  989. )
  990. a.makeConnection(b)
  991. # one byte at a time, to stress the implementation.
  992. for byte in iterbytes(requestBytes):
  993. a.dataReceived(byte)
  994. # Grab the request.
  995. stream = a.streams[1]
  996. request = stream._request.original
  997. # Write the first 5 bytes.
  998. request.write(b'fiver')
  999. dataChunks = [b'here', b'are', b'some', b'writes']
  1000. def write_chunks():
  1001. # Send in some writes.
  1002. for chunk in dataChunks:
  1003. request.write(chunk)
  1004. request.finish()
  1005. d = task.deferLater(reactor, 0.01, write_chunks)
  1006. d.addCallback(
  1007. lambda *args: a.dataReceived(
  1008. f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
  1009. )
  1010. )
  1011. # Check that the data was all written out correctly and that the stream
  1012. # state is cleaned up.
  1013. def validate(streamID):
  1014. frames = framesFromBytes(b.value())
  1015. # 2 Settings, Headers, 7 Data frames.
  1016. self.assertEqual(len(frames), 9)
  1017. self.assertTrue(all(f.stream_id == 1 for f in frames[2:]))
  1018. self.assertTrue(
  1019. isinstance(frames[2], hyperframe.frame.HeadersFrame)
  1020. )
  1021. self.assertTrue('END_STREAM' in frames[-1].flags)
  1022. receivedDataChunks = [
  1023. f.data for f in frames
  1024. if isinstance(f, hyperframe.frame.DataFrame)
  1025. ]
  1026. self.assertEqual(
  1027. receivedDataChunks,
  1028. [b"fiver"] + dataChunks + [b""],
  1029. )
  1030. return a._streamCleanupCallbacks[1].addCallback(validate)
  1031. def test_resetAfterBody(self):
  1032. """
  1033. A client that immediately resets after sending the body causes Twisted
  1034. to send no response.
  1035. """
  1036. frameFactory = FrameFactory()
  1037. transport = StringTransport()
  1038. a = H2Connection()
  1039. a.requestFactory = DummyHTTPHandlerProxy
  1040. requestBytes = frameFactory.clientConnectionPreface()
  1041. requestBytes += buildRequestBytes(
  1042. headers=self.getRequestHeaders, data=[], frameFactory=frameFactory
  1043. )
  1044. requestBytes += frameFactory.buildRstStreamFrame(
  1045. streamID=1
  1046. ).serialize()
  1047. a.makeConnection(transport)
  1048. a.dataReceived(requestBytes)
  1049. frames = framesFromBytes(transport.value())
  1050. self.assertEqual(len(frames), 1)
  1051. self.assertNotIn(1, a._streamCleanupCallbacks)
  1052. def test_RequestRequiringFactorySiteInConstructor(self):
  1053. """
  1054. A custom L{Request} subclass that requires the site and factory in the
  1055. constructor is able to get them.
  1056. """
  1057. d = defer.Deferred()
  1058. class SuperRequest(DummyHTTPHandler):
  1059. def __init__(self, *args, **kwargs):
  1060. DummyHTTPHandler.__init__(self, *args, **kwargs)
  1061. d.callback((self.channel.site, self.channel.factory))
  1062. connection = H2Connection()
  1063. httpFactory = http.HTTPFactory()
  1064. connection.requestFactory = _makeRequestProxyFactory(SuperRequest)
  1065. # Create some sentinels to look for.
  1066. connection.factory = httpFactory
  1067. connection.site = object()
  1068. self.connectAndReceive(connection, self.getRequestHeaders, [])
  1069. def validateFactoryAndSite(args):
  1070. site, factory = args
  1071. self.assertIs(site, connection.site)
  1072. self.assertIs(factory, connection.factory)
  1073. d.addCallback(validateFactoryAndSite)
  1074. # We need to wait for the stream cleanup callback to drain the
  1075. # response.
  1076. cleanupCallback = connection._streamCleanupCallbacks[1]
  1077. return defer.gatherResults([d, cleanupCallback])
  1078. class H2FlowControlTests(unittest.TestCase, HTTP2TestHelpers):
  1079. """
  1080. Tests that ensure that we handle HTTP/2 flow control limits appropriately.
  1081. """
  1082. getRequestHeaders = [
  1083. (b':method', b'GET'),
  1084. (b':authority', b'localhost'),
  1085. (b':path', b'/'),
  1086. (b':scheme', b'https'),
  1087. (b'user-agent', b'twisted-test-code'),
  1088. ]
  1089. getResponseData = b"'''\nNone\n'''\n"
  1090. postRequestHeaders = [
  1091. (b':method', b'POST'),
  1092. (b':authority', b'localhost'),
  1093. (b':path', b'/post_endpoint'),
  1094. (b':scheme', b'https'),
  1095. (b'user-agent', b'twisted-test-code'),
  1096. (b'content-length', b'25'),
  1097. ]
  1098. postRequestData = [b"hello ", b"world, ", b"it's ", b"http/2!"]
  1099. postResponseData = b"'''\n25\nhello world, it's http/2!'''\n"
  1100. def test_bufferExcessData(self):
  1101. """
  1102. When a L{Request} object is not using C{IProducer} to generate data and
  1103. so is not having backpressure exerted on it, the L{H2Stream} object
  1104. will buffer data until the flow control window is opened.
  1105. """
  1106. f = FrameFactory()
  1107. b = StringTransport()
  1108. a = H2Connection()
  1109. a.requestFactory = DummyHTTPHandlerProxy
  1110. # Shrink the window to 5 bytes, then send the request.
  1111. requestBytes = f.clientConnectionPreface()
  1112. requestBytes += f.buildSettingsFrame(
  1113. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  1114. ).serialize()
  1115. requestBytes += buildRequestBytes(
  1116. self.getRequestHeaders, [], f
  1117. )
  1118. a.makeConnection(b)
  1119. # one byte at a time, to stress the implementation.
  1120. for byte in iterbytes(requestBytes):
  1121. a.dataReceived(byte)
  1122. # Send in WindowUpdate frames that open the window one byte at a time,
  1123. # to repeatedly temporarily unbuffer data. 5 bytes will have already
  1124. # been sent.
  1125. bonusFrames = len(self.getResponseData) - 5
  1126. for _ in range(bonusFrames):
  1127. frame = f.buildWindowUpdateFrame(streamID=1, increment=1)
  1128. a.dataReceived(frame.serialize())
  1129. # Give the sending loop a chance to catch up!
  1130. def validate(streamID):
  1131. frames = framesFromBytes(b.value())
  1132. # Check that the stream is correctly terminated.
  1133. self.assertTrue('END_STREAM' in frames[-1].flags)
  1134. # Put the Data frames together to confirm we're all good.
  1135. actualResponseData = b''.join(
  1136. f.data for f in frames
  1137. if isinstance(f, hyperframe.frame.DataFrame)
  1138. )
  1139. self.assertEqual(self.getResponseData, actualResponseData)
  1140. return a._streamCleanupCallbacks[1].addCallback(validate)
  1141. def test_producerBlockingUnblocking(self):
  1142. """
  1143. L{Request} objects that have registered producers get blocked and
  1144. unblocked according to HTTP/2 flow control.
  1145. """
  1146. f = FrameFactory()
  1147. b = StringTransport()
  1148. a = H2Connection()
  1149. a.requestFactory = DummyProducerHandlerProxy
  1150. # Shrink the window to 5 bytes, then send the request.
  1151. requestBytes = f.clientConnectionPreface()
  1152. requestBytes += f.buildSettingsFrame(
  1153. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  1154. ).serialize()
  1155. requestBytes += buildRequestBytes(
  1156. self.getRequestHeaders, [], f
  1157. )
  1158. a.makeConnection(b)
  1159. # one byte at a time, to stress the implementation.
  1160. for byte in iterbytes(requestBytes):
  1161. a.dataReceived(byte)
  1162. # Grab the request object.
  1163. stream = a.streams[1]
  1164. request = stream._request.original
  1165. # Confirm that the stream believes the producer is producing.
  1166. self.assertTrue(stream._producerProducing)
  1167. # Write 10 bytes to the connection.
  1168. request.write(b"helloworld")
  1169. # The producer should have been paused.
  1170. self.assertFalse(stream._producerProducing)
  1171. self.assertEqual(request.producer.events, ['pause'])
  1172. # Open the flow control window by 5 bytes. This should not unpause the
  1173. # producer.
  1174. a.dataReceived(
  1175. f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()
  1176. )
  1177. self.assertFalse(stream._producerProducing)
  1178. self.assertEqual(request.producer.events, ['pause'])
  1179. # Open the connection window by 5 bytes as well. This should also not
  1180. # unpause the producer.
  1181. a.dataReceived(
  1182. f.buildWindowUpdateFrame(streamID=0, increment=5).serialize()
  1183. )
  1184. self.assertFalse(stream._producerProducing)
  1185. self.assertEqual(request.producer.events, ['pause'])
  1186. # Open it by five more bytes. This should unpause the producer.
  1187. a.dataReceived(
  1188. f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()
  1189. )
  1190. self.assertTrue(stream._producerProducing)
  1191. self.assertEqual(request.producer.events, ['pause', 'resume'])
  1192. # Write another 10 bytes, which should force us to pause again. When
  1193. # written this chunk will be sent as one lot, simply because of the
  1194. # fact that the sending loop is not currently running.
  1195. request.write(b"helloworld")
  1196. self.assertFalse(stream._producerProducing)
  1197. self.assertEqual(request.producer.events, ['pause', 'resume', 'pause'])
  1198. # Open the window wide and then complete the request.
  1199. a.dataReceived(
  1200. f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
  1201. )
  1202. self.assertTrue(stream._producerProducing)
  1203. self.assertEqual(
  1204. request.producer.events,
  1205. ['pause', 'resume', 'pause', 'resume']
  1206. )
  1207. request.unregisterProducer()
  1208. request.finish()
  1209. # Check that the sending loop sends all the appropriate data.
  1210. def validate(streamID):
  1211. frames = framesFromBytes(b.value())
  1212. # Check that the stream is correctly terminated.
  1213. self.assertTrue('END_STREAM' in frames[-1].flags)
  1214. # Grab the data from the frames.
  1215. dataChunks = [
  1216. f.data for f in frames
  1217. if isinstance(f, hyperframe.frame.DataFrame)
  1218. ]
  1219. self.assertEqual(
  1220. dataChunks,
  1221. [b"helloworld", b"helloworld", b""]
  1222. )
  1223. return a._streamCleanupCallbacks[1].addCallback(validate)
  1224. def test_flowControlExact(self):
  1225. """
  1226. Exactly filling the flow control window still blocks producers.
  1227. """
  1228. f = FrameFactory()
  1229. b = StringTransport()
  1230. a = H2Connection()
  1231. a.requestFactory = DummyProducerHandlerProxy
  1232. # Shrink the window to 5 bytes, then send the request.
  1233. requestBytes = f.clientConnectionPreface()
  1234. requestBytes += f.buildSettingsFrame(
  1235. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  1236. ).serialize()
  1237. requestBytes += buildRequestBytes(
  1238. self.getRequestHeaders, [], f
  1239. )
  1240. a.makeConnection(b)
  1241. # one byte at a time, to stress the implementation.
  1242. for byte in iterbytes(requestBytes):
  1243. a.dataReceived(byte)
  1244. # Grab the request object.
  1245. stream = a.streams[1]
  1246. request = stream._request.original
  1247. # Confirm that the stream believes the producer is producing.
  1248. self.assertTrue(stream._producerProducing)
  1249. # Write 10 bytes to the connection. This should block the producer
  1250. # immediately.
  1251. request.write(b"helloworld")
  1252. self.assertFalse(stream._producerProducing)
  1253. self.assertEqual(request.producer.events, ['pause'])
  1254. # Despite the producer being blocked, write one more byte. This should
  1255. # not get sent or force any other data to be sent.
  1256. request.write(b"h")
  1257. # Open the window wide and then complete the request. We do this by
  1258. # means of callLater to ensure that the sending loop has time to run.
  1259. def window_open():
  1260. a.dataReceived(
  1261. f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
  1262. )
  1263. self.assertTrue(stream._producerProducing)
  1264. self.assertEqual(
  1265. request.producer.events,
  1266. ['pause', 'resume']
  1267. )
  1268. request.unregisterProducer()
  1269. request.finish()
  1270. windowDefer = task.deferLater(reactor, 0, window_open)
  1271. # Check that the sending loop sends all the appropriate data.
  1272. def validate(streamID):
  1273. frames = framesFromBytes(b.value())
  1274. # Check that the stream is correctly terminated.
  1275. self.assertTrue('END_STREAM' in frames[-1].flags)
  1276. # Grab the data from the frames.
  1277. dataChunks = [
  1278. f.data for f in frames
  1279. if isinstance(f, hyperframe.frame.DataFrame)
  1280. ]
  1281. self.assertEqual(dataChunks, [b"hello", b"world", b"h", b""])
  1282. validateDefer = a._streamCleanupCallbacks[1].addCallback(validate)
  1283. return defer.DeferredList([windowDefer, validateDefer])
  1284. def test_endingBlockedStream(self):
  1285. """
  1286. L{Request} objects that end a stream that is currently blocked behind
  1287. flow control can still end the stream and get cleaned up.
  1288. """
  1289. f = FrameFactory()
  1290. b = StringTransport()
  1291. a = H2Connection()
  1292. a.requestFactory = DummyProducerHandlerProxy
  1293. # Shrink the window to 5 bytes, then send the request.
  1294. requestBytes = f.clientConnectionPreface()
  1295. requestBytes += f.buildSettingsFrame(
  1296. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  1297. ).serialize()
  1298. requestBytes += buildRequestBytes(
  1299. self.getRequestHeaders, [], f
  1300. )
  1301. a.makeConnection(b)
  1302. # one byte at a time, to stress the implementation.
  1303. for byte in iterbytes(requestBytes):
  1304. a.dataReceived(byte)
  1305. # Grab the request object.
  1306. stream = a.streams[1]
  1307. request = stream._request.original
  1308. # Confirm that the stream believes the producer is producing.
  1309. self.assertTrue(stream._producerProducing)
  1310. # Write 10 bytes to the connection, then complete the connection.
  1311. request.write(b"helloworld")
  1312. request.unregisterProducer()
  1313. request.finish()
  1314. # This should have completed the request.
  1315. self.assertTrue(request.finished)
  1316. # Open the window wide and then complete the request.
  1317. reactor.callLater(
  1318. 0,
  1319. a.dataReceived,
  1320. f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
  1321. )
  1322. # Check that the sending loop sends all the appropriate data.
  1323. def validate(streamID):
  1324. frames = framesFromBytes(b.value())
  1325. # Check that the stream is correctly terminated.
  1326. self.assertTrue('END_STREAM' in frames[-1].flags)
  1327. # Grab the data from the frames.
  1328. dataChunks = [
  1329. f.data for f in frames
  1330. if isinstance(f, hyperframe.frame.DataFrame)
  1331. ]
  1332. self.assertEqual(
  1333. dataChunks,
  1334. [b"hello", b"world", b""]
  1335. )
  1336. return a._streamCleanupCallbacks[1].addCallback(validate)
  1337. def test_responseWithoutBody(self):
  1338. """
  1339. We safely handle responses without bodies.
  1340. """
  1341. f = FrameFactory()
  1342. b = StringTransport()
  1343. a = H2Connection()
  1344. # We use the DummyProducerHandler just because we can guarantee that it
  1345. # doesn't end up with a body.
  1346. a.requestFactory = DummyProducerHandlerProxy
  1347. # Send the request.
  1348. requestBytes = f.clientConnectionPreface()
  1349. requestBytes += buildRequestBytes(
  1350. self.getRequestHeaders, [], f
  1351. )
  1352. a.makeConnection(b)
  1353. # one byte at a time, to stress the implementation.
  1354. for byte in iterbytes(requestBytes):
  1355. a.dataReceived(byte)
  1356. # Grab the request object and the stream completion callback.
  1357. stream = a.streams[1]
  1358. request = stream._request.original
  1359. cleanupCallback = a._streamCleanupCallbacks[1]
  1360. # Complete the connection immediately.
  1361. request.unregisterProducer()
  1362. request.finish()
  1363. # This should have completed the request.
  1364. self.assertTrue(request.finished)
  1365. # Check that the sending loop sends all the appropriate data.
  1366. def validate(streamID):
  1367. frames = framesFromBytes(b.value())
  1368. self.assertEqual(len(frames), 3)
  1369. # Check that the stream is correctly terminated.
  1370. self.assertTrue('END_STREAM' in frames[-1].flags)
  1371. # Grab the data from the frames.
  1372. dataChunks = [
  1373. f.data for f in frames
  1374. if isinstance(f, hyperframe.frame.DataFrame)
  1375. ]
  1376. self.assertEqual(
  1377. dataChunks,
  1378. [b""],
  1379. )
  1380. return cleanupCallback.addCallback(validate)
  1381. def test_windowUpdateForCompleteStream(self):
  1382. """
  1383. WindowUpdate frames received after we've completed the stream are
  1384. safely handled.
  1385. """
  1386. # To test this with the data sending loop working the way it does, we
  1387. # need to send *no* body on the response. That's unusual, but fine.
  1388. f = FrameFactory()
  1389. b = StringTransport()
  1390. a = H2Connection()
  1391. # We use the DummyProducerHandler just because we can guarantee that it
  1392. # doesn't end up with a body.
  1393. a.requestFactory = DummyProducerHandlerProxy
  1394. # Send the request.
  1395. requestBytes = f.clientConnectionPreface()
  1396. requestBytes += buildRequestBytes(
  1397. self.getRequestHeaders, [], f
  1398. )
  1399. a.makeConnection(b)
  1400. # one byte at a time, to stress the implementation.
  1401. for byte in iterbytes(requestBytes):
  1402. a.dataReceived(byte)
  1403. # Grab the request object and the stream completion callback.
  1404. stream = a.streams[1]
  1405. request = stream._request.original
  1406. cleanupCallback = a._streamCleanupCallbacks[1]
  1407. # Complete the connection immediately.
  1408. request.unregisterProducer()
  1409. request.finish()
  1410. # This should have completed the request.
  1411. self.assertTrue(request.finished)
  1412. # Now open the flow control window a bit. This should cause no
  1413. # problems.
  1414. a.dataReceived(
  1415. f.buildWindowUpdateFrame(streamID=1, increment=50).serialize()
  1416. )
  1417. # Check that the sending loop sends all the appropriate data.
  1418. def validate(streamID):
  1419. frames = framesFromBytes(b.value())
  1420. self.assertEqual(len(frames), 3)
  1421. # Check that the stream is correctly terminated.
  1422. self.assertTrue('END_STREAM' in frames[-1].flags)
  1423. # Grab the data from the frames.
  1424. dataChunks = [
  1425. f.data for f in frames
  1426. if isinstance(f, hyperframe.frame.DataFrame)
  1427. ]
  1428. self.assertEqual(
  1429. dataChunks,
  1430. [b""],
  1431. )
  1432. return cleanupCallback.addCallback(validate)
  1433. def test_producerUnblocked(self):
  1434. """
  1435. L{Request} objects that have registered producers that are not blocked
  1436. behind flow control do not have their producer notified.
  1437. """
  1438. f = FrameFactory()
  1439. b = StringTransport()
  1440. a = H2Connection()
  1441. a.requestFactory = DummyProducerHandlerProxy
  1442. # Shrink the window to 5 bytes, then send the request.
  1443. requestBytes = f.clientConnectionPreface()
  1444. requestBytes += f.buildSettingsFrame(
  1445. {h2.settings.SettingCodes.INITIAL_WINDOW_SIZE: 5}
  1446. ).serialize()
  1447. requestBytes += buildRequestBytes(
  1448. self.getRequestHeaders, [], f
  1449. )
  1450. a.makeConnection(b)
  1451. # one byte at a time, to stress the implementation.
  1452. for byte in iterbytes(requestBytes):
  1453. a.dataReceived(byte)
  1454. # Grab the request object.
  1455. stream = a.streams[1]
  1456. request = stream._request.original
  1457. # Confirm that the stream believes the producer is producing.
  1458. self.assertTrue(stream._producerProducing)
  1459. # Write 4 bytes to the connection, leaving space in the window.
  1460. request.write(b"word")
  1461. # The producer should not have been paused.
  1462. self.assertTrue(stream._producerProducing)
  1463. self.assertEqual(request.producer.events, [])
  1464. # Open the flow control window by 5 bytes. This should not notify the
  1465. # producer.
  1466. a.dataReceived(
  1467. f.buildWindowUpdateFrame(streamID=1, increment=5).serialize()
  1468. )
  1469. self.assertTrue(stream._producerProducing)
  1470. self.assertEqual(request.producer.events, [])
  1471. # Open the window wide complete the request.
  1472. request.unregisterProducer()
  1473. request.finish()
  1474. # Check that the sending loop sends all the appropriate data.
  1475. def validate(streamID):
  1476. frames = framesFromBytes(b.value())
  1477. # Check that the stream is correctly terminated.
  1478. self.assertTrue('END_STREAM' in frames[-1].flags)
  1479. # Grab the data from the frames.
  1480. dataChunks = [
  1481. f.data for f in frames
  1482. if isinstance(f, hyperframe.frame.DataFrame)
  1483. ]
  1484. self.assertEqual(
  1485. dataChunks,
  1486. [b"word", b""]
  1487. )
  1488. return a._streamCleanupCallbacks[1].addCallback(validate)
  1489. def test_unnecessaryWindowUpdate(self):
  1490. """
  1491. When a WindowUpdate frame is received for the whole connection but no
  1492. data is currently waiting, nothing exciting happens.
  1493. """
  1494. f = FrameFactory()
  1495. b = StringTransport()
  1496. a = H2Connection()
  1497. a.requestFactory = DummyHTTPHandlerProxy
  1498. # Send the request.
  1499. frames = buildRequestFrames(
  1500. self.postRequestHeaders, self.postRequestData, f
  1501. )
  1502. frames.insert(1, f.buildWindowUpdateFrame(streamID=0, increment=5))
  1503. requestBytes = f.clientConnectionPreface()
  1504. requestBytes += b''.join(f.serialize() for f in frames)
  1505. a.makeConnection(b)
  1506. # one byte at a time, to stress the implementation.
  1507. for byte in iterbytes(requestBytes):
  1508. a.dataReceived(byte)
  1509. # Give the sending loop a chance to catch up!
  1510. def validate(streamID):
  1511. frames = framesFromBytes(b.value())
  1512. # Check that the stream is correctly terminated.
  1513. self.assertTrue('END_STREAM' in frames[-1].flags)
  1514. # Put the Data frames together to confirm we're all good.
  1515. actualResponseData = b''.join(
  1516. f.data for f in frames
  1517. if isinstance(f, hyperframe.frame.DataFrame)
  1518. )
  1519. self.assertEqual(self.postResponseData, actualResponseData)
  1520. return a._streamCleanupCallbacks[1].addCallback(validate)
  1521. def test_unnecessaryWindowUpdateForStream(self):
  1522. """
  1523. When a WindowUpdate frame is received for a stream but no data is
  1524. currently waiting, that stream is not marked as unblocked and the
  1525. priority tree continues to assert that no stream can progress.
  1526. """
  1527. f = FrameFactory()
  1528. transport = StringTransport()
  1529. conn = H2Connection()
  1530. conn.requestFactory = DummyHTTPHandlerProxy
  1531. # Send a request that implies a body is coming. Twisted doesn't send a
  1532. # response until the entire request is received, so it won't queue any
  1533. # data yet. Then, fire off a WINDOW_UPDATE frame.
  1534. frames = []
  1535. frames.append(
  1536. f.buildHeadersFrame(headers=self.postRequestHeaders, streamID=1)
  1537. )
  1538. frames.append(f.buildWindowUpdateFrame(streamID=1, increment=5))
  1539. data = f.clientConnectionPreface()
  1540. data += b''.join(f.serialize() for f in frames)
  1541. conn.makeConnection(transport)
  1542. conn.dataReceived(data)
  1543. self.assertAllStreamsBlocked(conn)
  1544. def test_windowUpdateAfterTerminate(self):
  1545. """
  1546. When a WindowUpdate frame is received for a stream that has been
  1547. aborted it is ignored.
  1548. """
  1549. f = FrameFactory()
  1550. b = StringTransport()
  1551. a = H2Connection()
  1552. a.requestFactory = DummyHTTPHandlerProxy
  1553. # Send the request.
  1554. frames = buildRequestFrames(
  1555. self.postRequestHeaders, self.postRequestData, f
  1556. )
  1557. requestBytes = f.clientConnectionPreface()
  1558. requestBytes += b''.join(f.serialize() for f in frames)
  1559. a.makeConnection(b)
  1560. # one byte at a time, to stress the implementation.
  1561. for byte in iterbytes(requestBytes):
  1562. a.dataReceived(byte)
  1563. # Abort the connection.
  1564. a.streams[1].abortConnection()
  1565. # Send a WindowUpdate
  1566. windowUpdateFrame = f.buildWindowUpdateFrame(streamID=1, increment=5)
  1567. a.dataReceived(windowUpdateFrame.serialize())
  1568. # Give the sending loop a chance to catch up!
  1569. frames = framesFromBytes(b.value())
  1570. # Check that the stream is terminated.
  1571. self.assertTrue(
  1572. isinstance(frames[-1], hyperframe.frame.RstStreamFrame)
  1573. )
  1574. def test_windowUpdateAfterComplete(self):
  1575. """
  1576. When a WindowUpdate frame is received for a stream that has been
  1577. completed it is ignored.
  1578. """
  1579. f = FrameFactory()
  1580. b = StringTransport()
  1581. a = H2Connection()
  1582. a.requestFactory = DummyHTTPHandlerProxy
  1583. # Send the request.
  1584. frames = buildRequestFrames(
  1585. self.postRequestHeaders, self.postRequestData, f
  1586. )
  1587. requestBytes = f.clientConnectionPreface()
  1588. requestBytes += b''.join(f.serialize() for f in frames)
  1589. a.makeConnection(b)
  1590. # one byte at a time, to stress the implementation.
  1591. for byte in iterbytes(requestBytes):
  1592. a.dataReceived(byte)
  1593. def update_window(*args):
  1594. # Send a WindowUpdate
  1595. windowUpdateFrame = f.buildWindowUpdateFrame(
  1596. streamID=1, increment=5
  1597. )
  1598. a.dataReceived(windowUpdateFrame.serialize())
  1599. def validate(*args):
  1600. # Give the sending loop a chance to catch up!
  1601. frames = framesFromBytes(b.value())
  1602. # Check that the stream is ended neatly.
  1603. self.assertIn('END_STREAM', frames[-1].flags)
  1604. d = a._streamCleanupCallbacks[1].addCallback(update_window)
  1605. return d.addCallback(validate)
  1606. def test_dataAndRstStream(self):
  1607. """
  1608. When a DATA frame is received at the same time as RST_STREAM,
  1609. Twisted does not send WINDOW_UPDATE frames for the stream.
  1610. """
  1611. frameFactory = FrameFactory()
  1612. transport = StringTransport()
  1613. a = H2Connection()
  1614. a.requestFactory = DummyHTTPHandlerProxy
  1615. # Send the request, but instead of the last frame send a RST_STREAM
  1616. # frame instead. This needs to be very long to actually force the
  1617. # WINDOW_UPDATE frames out.
  1618. frameData = [b'\x00' * (2**14)] * 4
  1619. bodyLength = "{}".format(sum(len(data) for data in frameData))
  1620. headers = (
  1621. self.postRequestHeaders[:-1] + [('content-length', bodyLength)]
  1622. )
  1623. frames = buildRequestFrames(
  1624. headers=headers,
  1625. data=frameData,
  1626. frameFactory=frameFactory
  1627. )
  1628. del frames[-1]
  1629. frames.append(
  1630. frameFactory.buildRstStreamFrame(
  1631. streamID=1, errorCode=h2.errors.ErrorCodes.INTERNAL_ERROR
  1632. )
  1633. )
  1634. requestBytes = frameFactory.clientConnectionPreface()
  1635. requestBytes += b''.join(f.serialize() for f in frames)
  1636. a.makeConnection(transport)
  1637. # Feed all the bytes at once. This is important: if they arrive slowly,
  1638. # Twisted doesn't have any problems.
  1639. a.dataReceived(requestBytes)
  1640. # Check the frames we got. We expect a WINDOW_UPDATE frame only for the
  1641. # connection, because Twisted knew the stream was going to be reset.
  1642. frames = framesFromBytes(transport.value())
  1643. # Check that the only WINDOW_UPDATE frame came for the connection.
  1644. windowUpdateFrameIDs = [
  1645. f.stream_id for f in frames
  1646. if isinstance(f, hyperframe.frame.WindowUpdateFrame)
  1647. ]
  1648. self.assertEqual([0], windowUpdateFrameIDs)
  1649. # While we're here: we shouldn't have received HEADERS or DATA for this
  1650. # either.
  1651. headersFrames = [
  1652. f for f in frames if isinstance(f, hyperframe.frame.HeadersFrame)
  1653. ]
  1654. dataFrames = [
  1655. f for f in frames if isinstance(f, hyperframe.frame.DataFrame)
  1656. ]
  1657. self.assertFalse(headersFrames)
  1658. self.assertFalse(dataFrames)
  1659. class HTTP2TransportChecking(unittest.TestCase, HTTP2TestHelpers):
  1660. getRequestHeaders = [
  1661. (b':method', b'GET'),
  1662. (b':authority', b'localhost'),
  1663. (b':path', b'/'),
  1664. (b':scheme', b'https'),
  1665. (b'user-agent', b'twisted-test-code'),
  1666. (b'custom-header', b'1'),
  1667. (b'custom-header', b'2'),
  1668. ]
  1669. def test_registerProducerWithTransport(self):
  1670. """
  1671. L{H2Connection} can be registered with the transport as a producer.
  1672. """
  1673. b = StringTransport()
  1674. a = H2Connection()
  1675. a.requestFactory = DummyHTTPHandlerProxy
  1676. b.registerProducer(a, True)
  1677. self.assertTrue(b.producer is a)
  1678. def test_pausingProducerPreventsDataSend(self):
  1679. """
  1680. L{H2Connection} can be paused by its consumer. When paused it stops
  1681. sending data to the transport.
  1682. """
  1683. f = FrameFactory()
  1684. b = StringTransport()
  1685. a = H2Connection()
  1686. a.requestFactory = DummyHTTPHandlerProxy
  1687. # Send the request.
  1688. frames = buildRequestFrames(self.getRequestHeaders, [], f)
  1689. requestBytes = f.clientConnectionPreface()
  1690. requestBytes += b''.join(f.serialize() for f in frames)
  1691. a.makeConnection(b)
  1692. b.registerProducer(a, True)
  1693. # one byte at a time, to stress the implementation.
  1694. for byte in iterbytes(requestBytes):
  1695. a.dataReceived(byte)
  1696. # The headers will be sent immediately, but the body will be waiting
  1697. # until the reactor gets to spin. Before it does we'll pause
  1698. # production.
  1699. a.pauseProducing()
  1700. # Now we want to build up a whole chain of Deferreds. We want to
  1701. # 1. deferLater for a moment to let the sending loop run, which should
  1702. # block.
  1703. # 2. After that deferred fires, we want to validate that no data has
  1704. # been sent yet.
  1705. # 3. Then we want to resume the production.
  1706. # 4. Then, we want to wait for the stream completion deferred.
  1707. # 5. Validate that the data is correct.
  1708. cleanupCallback = a._streamCleanupCallbacks[1]
  1709. def validateNotSent(*args):
  1710. frames = framesFromBytes(b.value())
  1711. self.assertEqual(len(frames), 2)
  1712. self.assertFalse(
  1713. isinstance(frames[-1], hyperframe.frame.DataFrame)
  1714. )
  1715. a.resumeProducing()
  1716. # Resume producing is a no-op, so let's call it a bunch more times.
  1717. a.resumeProducing()
  1718. a.resumeProducing()
  1719. a.resumeProducing()
  1720. a.resumeProducing()
  1721. return cleanupCallback
  1722. def validateComplete(*args):
  1723. frames = framesFromBytes(b.value())
  1724. # Check that the stream is correctly terminated.
  1725. self.assertEqual(len(frames), 4)
  1726. self.assertTrue('END_STREAM' in frames[-1].flags)
  1727. d = task.deferLater(reactor, 0.01, validateNotSent)
  1728. d.addCallback(validateComplete)
  1729. return d
  1730. def test_stopProducing(self):
  1731. """
  1732. L{H2Connection} can be stopped by its producer. That causes it to lose
  1733. its transport.
  1734. """
  1735. f = FrameFactory()
  1736. b = StringTransport()
  1737. a = H2Connection()
  1738. a.requestFactory = DummyHTTPHandlerProxy
  1739. # Send the request.
  1740. frames = buildRequestFrames(self.getRequestHeaders, [], f)
  1741. requestBytes = f.clientConnectionPreface()
  1742. requestBytes += b''.join(f.serialize() for f in frames)
  1743. a.makeConnection(b)
  1744. b.registerProducer(a, True)
  1745. # one byte at a time, to stress the implementation.
  1746. for byte in iterbytes(requestBytes):
  1747. a.dataReceived(byte)
  1748. # The headers will be sent immediately, but the body will be waiting
  1749. # until the reactor gets to spin. Before it does we'll stop production.
  1750. a.stopProducing()
  1751. frames = framesFromBytes(b.value())
  1752. self.assertEqual(len(frames), 2)
  1753. self.assertFalse(
  1754. isinstance(frames[-1], hyperframe.frame.DataFrame)
  1755. )
  1756. self.assertFalse(a._stillProducing)
  1757. def test_passthroughHostAndPeer(self):
  1758. """
  1759. A L{H2Stream} object correctly passes through host and peer information
  1760. from its L{H2Connection}.
  1761. """
  1762. hostAddress = IPv4Address("TCP", "17.52.24.8", 443)
  1763. peerAddress = IPv4Address("TCP", "17.188.0.12", 32008)
  1764. frameFactory = FrameFactory()
  1765. transport = StringTransport(
  1766. hostAddress=hostAddress, peerAddress=peerAddress
  1767. )
  1768. connection = H2Connection()
  1769. connection.requestFactory = DummyHTTPHandlerProxy
  1770. connection.makeConnection(transport)
  1771. frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
  1772. requestBytes = frameFactory.clientConnectionPreface()
  1773. requestBytes += b''.join(frame.serialize() for frame in frames)
  1774. for byte in iterbytes(requestBytes):
  1775. connection.dataReceived(byte)
  1776. # The stream is present. Go grab the stream object.
  1777. stream = connection.streams[1]
  1778. self.assertEqual(stream.getHost(), hostAddress)
  1779. self.assertEqual(stream.getPeer(), peerAddress)
  1780. # Allow the stream to finish up and check the result.
  1781. cleanupCallback = connection._streamCleanupCallbacks[1]
  1782. def validate(*args):
  1783. self.assertEqual(stream.getHost(), hostAddress)
  1784. self.assertEqual(stream.getPeer(), peerAddress)
  1785. return cleanupCallback.addCallback(validate)
  1786. class HTTP2SchedulingTests(unittest.TestCase, HTTP2TestHelpers):
  1787. """
  1788. The H2Connection object schedules certain events (mostly its data sending
  1789. loop) using callbacks from the reactor. These tests validate that the calls
  1790. are scheduled correctly.
  1791. """
  1792. def test_initiallySchedulesOneDataCall(self):
  1793. """
  1794. When a H2Connection is established it schedules one call to be run as
  1795. soon as the reactor has time.
  1796. """
  1797. reactor = task.Clock()
  1798. a = H2Connection(reactor)
  1799. calls = reactor.getDelayedCalls()
  1800. self.assertEqual(len(calls), 1)
  1801. call = calls[0]
  1802. # Validate that the call is scheduled for right now, but hasn't run,
  1803. # and that it's correct.
  1804. self.assertTrue(call.active())
  1805. self.assertEqual(call.time, 0)
  1806. self.assertEqual(call.func, a._sendPrioritisedData)
  1807. self.assertEqual(call.args, ())
  1808. self.assertEqual(call.kw, {})
  1809. class HTTP2TimeoutTests(unittest.TestCase, HTTP2TestHelpers):
  1810. """
  1811. The L{H2Connection} object times out idle connections.
  1812. """
  1813. getRequestHeaders = [
  1814. (b':method', b'GET'),
  1815. (b':authority', b'localhost'),
  1816. (b':path', b'/'),
  1817. (b':scheme', b'https'),
  1818. (b'user-agent', b'twisted-test-code'),
  1819. (b'custom-header', b'1'),
  1820. (b'custom-header', b'2'),
  1821. ]
  1822. # A sentinel object used to flag default timeouts
  1823. _DEFAULT = object()
  1824. def patch_TimeoutMixin_clock(self, connection, reactor):
  1825. """
  1826. Unfortunately, TimeoutMixin does not allow passing an explicit reactor
  1827. to test timeouts. For that reason, we need to monkeypatch the method
  1828. set up by the TimeoutMixin.
  1829. @param connection: The HTTP/2 connection object to patch.
  1830. @type connection: L{H2Connection}
  1831. @param reactor: The reactor whose callLater method we want.
  1832. @type reactor: An object implementing
  1833. L{twisted.internet.interfaces.IReactorTime}
  1834. """
  1835. connection.callLater = reactor.callLater
  1836. def initiateH2Connection(self, initialData, requestFactory):
  1837. """
  1838. Performs test setup by building a HTTP/2 connection object, a transport
  1839. to back it, a reactor to run in, and sending in some initial data as
  1840. needed.
  1841. @param initialData: The initial HTTP/2 data to be fed into the
  1842. connection after setup.
  1843. @type initialData: L{bytes}
  1844. @param requestFactory: The L{Request} factory to use with the
  1845. connection.
  1846. """
  1847. reactor = task.Clock()
  1848. conn = H2Connection(reactor)
  1849. conn.timeOut = 100
  1850. self.patch_TimeoutMixin_clock(conn, reactor)
  1851. transport = StringTransport()
  1852. conn.requestFactory = _makeRequestProxyFactory(requestFactory)
  1853. conn.makeConnection(transport)
  1854. # one byte at a time, to stress the implementation.
  1855. for byte in iterbytes(initialData):
  1856. conn.dataReceived(byte)
  1857. return (reactor, conn, transport)
  1858. def assertTimedOut(self, data, frameCount, errorCode, lastStreamID):
  1859. """
  1860. Confirm that the data that was sent matches what we expect from a
  1861. timeout: namely, that it ends with a GOAWAY frame carrying an
  1862. appropriate error code and last stream ID.
  1863. """
  1864. frames = framesFromBytes(data)
  1865. self.assertEqual(len(frames), frameCount)
  1866. self.assertTrue(
  1867. isinstance(frames[-1], hyperframe.frame.GoAwayFrame)
  1868. )
  1869. self.assertEqual(frames[-1].error_code, errorCode)
  1870. self.assertEqual(frames[-1].last_stream_id, lastStreamID)
  1871. def prepareAbortTest(self, abortTimeout=_DEFAULT):
  1872. """
  1873. Does the common setup for tests that want to test the aborting
  1874. functionality of the HTTP/2 stack.
  1875. @param abortTimeout: The value to use for the abortTimeout. Defaults to
  1876. whatever is set on L{H2Connection.abortTimeout}.
  1877. @type abortTimeout: L{int} or L{None}
  1878. @return: A tuple of the reactor being used for the connection, the
  1879. connection itself, and the transport.
  1880. """
  1881. if abortTimeout is self._DEFAULT:
  1882. abortTimeout = H2Connection.abortTimeout
  1883. frameFactory = FrameFactory()
  1884. initialData = frameFactory.clientConnectionPreface()
  1885. reactor, conn, transport = self.initiateH2Connection(
  1886. initialData, requestFactory=DummyHTTPHandler,
  1887. )
  1888. conn.abortTimeout = abortTimeout
  1889. # Advance the clock.
  1890. reactor.advance(100)
  1891. self.assertTimedOut(
  1892. transport.value(),
  1893. frameCount=2,
  1894. errorCode=h2.errors.ErrorCodes.NO_ERROR,
  1895. lastStreamID=0
  1896. )
  1897. self.assertTrue(transport.disconnecting)
  1898. self.assertFalse(transport.disconnected)
  1899. return reactor, conn, transport
  1900. def test_timeoutAfterInactivity(self):
  1901. """
  1902. When a L{H2Connection} does not receive any data for more than the
  1903. time out interval, it closes the connection cleanly.
  1904. """
  1905. frameFactory = FrameFactory()
  1906. initialData = frameFactory.clientConnectionPreface()
  1907. reactor, conn, transport = self.initiateH2Connection(
  1908. initialData, requestFactory=DummyHTTPHandler,
  1909. )
  1910. # Save the response preamble.
  1911. preamble = transport.value()
  1912. # Advance the clock.
  1913. reactor.advance(99)
  1914. # Everything is fine, no extra data got sent.
  1915. self.assertEqual(preamble, transport.value())
  1916. self.assertFalse(transport.disconnecting)
  1917. # Advance the clock.
  1918. reactor.advance(2)
  1919. self.assertTimedOut(
  1920. transport.value(),
  1921. frameCount=2,
  1922. errorCode=h2.errors.ErrorCodes.NO_ERROR,
  1923. lastStreamID=0
  1924. )
  1925. self.assertTrue(transport.disconnecting)
  1926. def test_timeoutResetByData(self):
  1927. """
  1928. When a L{H2Connection} receives data, the timeout is reset.
  1929. """
  1930. # Don't send any initial data, we'll send the preamble manually.
  1931. frameFactory = FrameFactory()
  1932. initialData = b''
  1933. reactor, conn, transport = self.initiateH2Connection(
  1934. initialData, requestFactory=DummyHTTPHandler,
  1935. )
  1936. # Send one byte of the preamble every 99 'seconds'.
  1937. for byte in iterbytes(frameFactory.clientConnectionPreface()):
  1938. conn.dataReceived(byte)
  1939. # Advance the clock.
  1940. reactor.advance(99)
  1941. # Everything is fine.
  1942. self.assertFalse(transport.disconnecting)
  1943. # Advance the clock.
  1944. reactor.advance(2)
  1945. self.assertTimedOut(
  1946. transport.value(),
  1947. frameCount=2,
  1948. errorCode=h2.errors.ErrorCodes.NO_ERROR,
  1949. lastStreamID=0
  1950. )
  1951. self.assertTrue(transport.disconnecting)
  1952. def test_timeoutWithProtocolErrorIfStreamsOpen(self):
  1953. """
  1954. When a L{H2Connection} times out with active streams, the error code
  1955. returned is L{h2.errors.ErrorCodes.PROTOCOL_ERROR}.
  1956. """
  1957. frameFactory = FrameFactory()
  1958. frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
  1959. initialData = frameFactory.clientConnectionPreface()
  1960. initialData += b''.join(f.serialize() for f in frames)
  1961. reactor, conn, transport = self.initiateH2Connection(
  1962. initialData, requestFactory=DummyProducerHandler,
  1963. )
  1964. # Advance the clock to time out the request.
  1965. reactor.advance(101)
  1966. self.assertTimedOut(
  1967. transport.value(),
  1968. frameCount=2,
  1969. errorCode=h2.errors.ErrorCodes.PROTOCOL_ERROR,
  1970. lastStreamID=1
  1971. )
  1972. self.assertTrue(transport.disconnecting)
  1973. def test_noTimeoutIfConnectionLost(self):
  1974. """
  1975. When a L{H2Connection} loses its connection it cancels its timeout.
  1976. """
  1977. frameFactory = FrameFactory()
  1978. frames = buildRequestFrames(self.getRequestHeaders, [], frameFactory)
  1979. initialData = frameFactory.clientConnectionPreface()
  1980. initialData += b''.join(f.serialize() for f in frames)
  1981. reactor, conn, transport = self.initiateH2Connection(
  1982. initialData, requestFactory=DummyProducerHandler,
  1983. )
  1984. sentData = transport.value()
  1985. oldCallCount = len(reactor.getDelayedCalls())
  1986. # Now lose the connection.
  1987. conn.connectionLost("reason")
  1988. # There should be one fewer call than there was.
  1989. currentCallCount = len(reactor.getDelayedCalls())
  1990. self.assertEqual(oldCallCount - 1, currentCallCount)
  1991. # Advancing the clock should do nothing.
  1992. reactor.advance(101)
  1993. self.assertEqual(transport.value(), sentData)
  1994. def test_timeoutEventuallyForcesConnectionClosed(self):
  1995. """
  1996. When a L{H2Connection} has timed the connection out, and the transport
  1997. doesn't get torn down within 15 seconds, it gets forcibly closed.
  1998. """
  1999. reactor, conn, transport = self.prepareAbortTest()
  2000. # Advance the clock to see that we abort the connection.
  2001. reactor.advance(14)
  2002. self.assertTrue(transport.disconnecting)
  2003. self.assertFalse(transport.disconnected)
  2004. reactor.advance(1)
  2005. self.assertTrue(transport.disconnecting)
  2006. self.assertTrue(transport.disconnected)
  2007. def test_losingConnectionCancelsTheAbort(self):
  2008. """
  2009. When a L{H2Connection} has timed the connection out, getting
  2010. C{connectionLost} called on it cancels the forcible connection close.
  2011. """
  2012. reactor, conn, transport = self.prepareAbortTest()
  2013. # Advance the clock, but right before the end fire connectionLost.
  2014. reactor.advance(14)
  2015. conn.connectionLost(None)
  2016. # Check that the transport isn't forcibly closed.
  2017. reactor.advance(1)
  2018. self.assertTrue(transport.disconnecting)
  2019. self.assertFalse(transport.disconnected)
  2020. def test_losingConnectionWithNoAbortTimeOut(self):
  2021. """
  2022. When a L{H2Connection} has timed the connection out but the
  2023. C{abortTimeout} is set to L{None}, the connection is never aborted.
  2024. """
  2025. reactor, conn, transport = self.prepareAbortTest(abortTimeout=None)
  2026. # Advance the clock an arbitrarily long way, and confirm it never
  2027. # aborts.
  2028. reactor.advance(2**32)
  2029. self.assertTrue(transport.disconnecting)
  2030. self.assertFalse(transport.disconnected)