stream.py 54 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399
  1. # -*- coding: utf-8 -*-
  2. """
  3. h2/stream
  4. ~~~~~~~~~
  5. An implementation of a HTTP/2 stream.
  6. """
  7. import warnings
  8. from enum import Enum, IntEnum
  9. from hpack import HeaderTuple
  10. from hyperframe.frame import (
  11. HeadersFrame, ContinuationFrame, DataFrame, WindowUpdateFrame,
  12. RstStreamFrame, PushPromiseFrame, AltSvcFrame
  13. )
  14. from .errors import ErrorCodes, _error_code_from_int
  15. from .events import (
  16. RequestReceived, ResponseReceived, DataReceived, WindowUpdated,
  17. StreamEnded, PushedStreamReceived, StreamReset, TrailersReceived,
  18. InformationalResponseReceived, AlternativeServiceAvailable,
  19. _ResponseSent, _RequestSent, _TrailersSent, _PushedRequestSent
  20. )
  21. from .exceptions import (
  22. ProtocolError, StreamClosedError, InvalidBodyLengthError, FlowControlError
  23. )
  24. from .utilities import (
  25. guard_increment_window, is_informational_response, authority_from_headers,
  26. validate_headers, validate_outbound_headers, normalize_outbound_headers,
  27. HeaderValidationFlags, extract_method_header
  28. )
  29. from .windows import WindowManager
  30. class StreamState(IntEnum):
  31. IDLE = 0
  32. RESERVED_REMOTE = 1
  33. RESERVED_LOCAL = 2
  34. OPEN = 3
  35. HALF_CLOSED_REMOTE = 4
  36. HALF_CLOSED_LOCAL = 5
  37. CLOSED = 6
  38. class StreamInputs(Enum):
  39. SEND_HEADERS = 0
  40. SEND_PUSH_PROMISE = 1
  41. SEND_RST_STREAM = 2
  42. SEND_DATA = 3
  43. SEND_WINDOW_UPDATE = 4
  44. SEND_END_STREAM = 5
  45. RECV_HEADERS = 6
  46. RECV_PUSH_PROMISE = 7
  47. RECV_RST_STREAM = 8
  48. RECV_DATA = 9
  49. RECV_WINDOW_UPDATE = 10
  50. RECV_END_STREAM = 11
  51. RECV_CONTINUATION = 12 # Added in 2.0.0
  52. SEND_INFORMATIONAL_HEADERS = 13 # Added in 2.2.0
  53. RECV_INFORMATIONAL_HEADERS = 14 # Added in 2.2.0
  54. SEND_ALTERNATIVE_SERVICE = 15 # Added in 2.3.0
  55. RECV_ALTERNATIVE_SERVICE = 16 # Added in 2.3.0
  56. UPGRADE_CLIENT = 17 # Added 2.3.0
  57. UPGRADE_SERVER = 18 # Added 2.3.0
  58. class StreamClosedBy(Enum):
  59. SEND_END_STREAM = 0
  60. RECV_END_STREAM = 1
  61. SEND_RST_STREAM = 2
  62. RECV_RST_STREAM = 3
  63. # This array is initialized once, and is indexed by the stream states above.
  64. # It indicates whether a stream in the given state is open. The reason we do
  65. # this is that we potentially check whether a stream in a given state is open
  66. # quite frequently: given that we check so often, we should do so in the
  67. # fastest and most performant way possible.
  68. STREAM_OPEN = [False for _ in range(0, len(StreamState))]
  69. STREAM_OPEN[StreamState.OPEN] = True
  70. STREAM_OPEN[StreamState.HALF_CLOSED_LOCAL] = True
  71. STREAM_OPEN[StreamState.HALF_CLOSED_REMOTE] = True
  72. class H2StreamStateMachine(object):
  73. """
  74. A single HTTP/2 stream state machine.
  75. This stream object implements basically the state machine described in
  76. RFC 7540 section 5.1.
  77. :param stream_id: The stream ID of this stream. This is stored primarily
  78. for logging purposes.
  79. """
  80. def __init__(self, stream_id):
  81. self.state = StreamState.IDLE
  82. self.stream_id = stream_id
  83. #: Whether this peer is the client side of this stream.
  84. self.client = None
  85. # Whether trailers have been sent/received on this stream or not.
  86. self.headers_sent = None
  87. self.trailers_sent = None
  88. self.headers_received = None
  89. self.trailers_received = None
  90. # How the stream was closed. One of StreamClosedBy.
  91. self.stream_closed_by = None
  92. def process_input(self, input_):
  93. """
  94. Process a specific input in the state machine.
  95. """
  96. if not isinstance(input_, StreamInputs):
  97. raise ValueError("Input must be an instance of StreamInputs")
  98. try:
  99. func, target_state = _transitions[(self.state, input_)]
  100. except KeyError:
  101. old_state = self.state
  102. self.state = StreamState.CLOSED
  103. raise ProtocolError(
  104. "Invalid input %s in state %s" % (input_, old_state)
  105. )
  106. else:
  107. previous_state = self.state
  108. self.state = target_state
  109. if func is not None:
  110. try:
  111. return func(self, previous_state)
  112. except ProtocolError:
  113. self.state = StreamState.CLOSED
  114. raise
  115. except AssertionError as e: # pragma: no cover
  116. self.state = StreamState.CLOSED
  117. raise ProtocolError(e)
  118. return []
  119. def request_sent(self, previous_state):
  120. """
  121. Fires when a request is sent.
  122. """
  123. self.client = True
  124. self.headers_sent = True
  125. event = _RequestSent()
  126. return [event]
  127. def response_sent(self, previous_state):
  128. """
  129. Fires when something that should be a response is sent. This 'response'
  130. may actually be trailers.
  131. """
  132. if not self.headers_sent:
  133. if self.client is True or self.client is None:
  134. raise ProtocolError("Client cannot send responses.")
  135. self.headers_sent = True
  136. event = _ResponseSent()
  137. else:
  138. assert not self.trailers_sent
  139. self.trailers_sent = True
  140. event = _TrailersSent()
  141. return [event]
  142. def request_received(self, previous_state):
  143. """
  144. Fires when a request is received.
  145. """
  146. assert not self.headers_received
  147. assert not self.trailers_received
  148. self.client = False
  149. self.headers_received = True
  150. event = RequestReceived()
  151. event.stream_id = self.stream_id
  152. return [event]
  153. def response_received(self, previous_state):
  154. """
  155. Fires when a response is received. Also disambiguates between responses
  156. and trailers.
  157. """
  158. if not self.headers_received:
  159. assert self.client is True
  160. self.headers_received = True
  161. event = ResponseReceived()
  162. else:
  163. assert not self.trailers_received
  164. self.trailers_received = True
  165. event = TrailersReceived()
  166. event.stream_id = self.stream_id
  167. return [event]
  168. def data_received(self, previous_state):
  169. """
  170. Fires when data is received.
  171. """
  172. event = DataReceived()
  173. event.stream_id = self.stream_id
  174. return [event]
  175. def window_updated(self, previous_state):
  176. """
  177. Fires when a window update frame is received.
  178. """
  179. event = WindowUpdated()
  180. event.stream_id = self.stream_id
  181. return [event]
  182. def stream_half_closed(self, previous_state):
  183. """
  184. Fires when an END_STREAM flag is received in the OPEN state,
  185. transitioning this stream to a HALF_CLOSED_REMOTE state.
  186. """
  187. event = StreamEnded()
  188. event.stream_id = self.stream_id
  189. return [event]
  190. def stream_ended(self, previous_state):
  191. """
  192. Fires when a stream is cleanly ended.
  193. """
  194. self.stream_closed_by = StreamClosedBy.RECV_END_STREAM
  195. event = StreamEnded()
  196. event.stream_id = self.stream_id
  197. return [event]
  198. def stream_reset(self, previous_state):
  199. """
  200. Fired when a stream is forcefully reset.
  201. """
  202. self.stream_closed_by = StreamClosedBy.RECV_RST_STREAM
  203. event = StreamReset()
  204. event.stream_id = self.stream_id
  205. return [event]
  206. def send_new_pushed_stream(self, previous_state):
  207. """
  208. Fires on the newly pushed stream, when pushed by the local peer.
  209. No event here, but definitionally this peer must be a server.
  210. """
  211. assert self.client is None
  212. self.client = False
  213. self.headers_received = True
  214. return []
  215. def recv_new_pushed_stream(self, previous_state):
  216. """
  217. Fires on the newly pushed stream, when pushed by the remote peer.
  218. No event here, but definitionally this peer must be a client.
  219. """
  220. assert self.client is None
  221. self.client = True
  222. self.headers_sent = True
  223. return []
  224. def send_push_promise(self, previous_state):
  225. """
  226. Fires on the already-existing stream when a PUSH_PROMISE frame is sent.
  227. We may only send PUSH_PROMISE frames if we're a server.
  228. """
  229. if self.client is True:
  230. raise ProtocolError("Cannot push streams from client peers.")
  231. event = _PushedRequestSent()
  232. return [event]
  233. def recv_push_promise(self, previous_state):
  234. """
  235. Fires on the already-existing stream when a PUSH_PROMISE frame is
  236. received. We may only receive PUSH_PROMISE frames if we're a client.
  237. Fires a PushedStreamReceived event.
  238. """
  239. if not self.client:
  240. if self.client is None: # pragma: no cover
  241. msg = "Idle streams cannot receive pushes"
  242. else: # pragma: no cover
  243. msg = "Cannot receive pushed streams as a server"
  244. raise ProtocolError(msg)
  245. event = PushedStreamReceived()
  246. event.parent_stream_id = self.stream_id
  247. return [event]
  248. def send_end_stream(self, previous_state):
  249. """
  250. Called when an attempt is made to send END_STREAM in the
  251. HALF_CLOSED_REMOTE state.
  252. """
  253. self.stream_closed_by = StreamClosedBy.SEND_END_STREAM
  254. def send_reset_stream(self, previous_state):
  255. """
  256. Called when an attempt is made to send RST_STREAM in a non-closed
  257. stream state.
  258. """
  259. self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
  260. def reset_stream_on_error(self, previous_state):
  261. """
  262. Called when we need to forcefully emit another RST_STREAM frame on
  263. behalf of the state machine.
  264. If this is the first time we've done this, we should also hang an event
  265. off the StreamClosedError so that the user can be informed. We know
  266. it's the first time we've done this if the stream is currently in a
  267. state other than CLOSED.
  268. """
  269. self.stream_closed_by = StreamClosedBy.SEND_RST_STREAM
  270. error = StreamClosedError(self.stream_id)
  271. event = StreamReset()
  272. event.stream_id = self.stream_id
  273. event.error_code = ErrorCodes.STREAM_CLOSED
  274. event.remote_reset = False
  275. error._events = [event]
  276. raise error
  277. def recv_on_closed_stream(self, previous_state):
  278. """
  279. Called when an unexpected frame is received on an already-closed
  280. stream.
  281. An endpoint that receives an unexpected frame should treat it as
  282. a stream error or connection error with type STREAM_CLOSED, depending
  283. on the specific frame. The error handling is done at a higher level:
  284. this just raises the appropriate error.
  285. """
  286. raise StreamClosedError(self.stream_id)
  287. def send_on_closed_stream(self, previous_state):
  288. """
  289. Called when an attempt is made to send data on an already-closed
  290. stream.
  291. This essentially overrides the standard logic by throwing a
  292. more-specific error: StreamClosedError. This is a ProtocolError, so it
  293. matches the standard API of the state machine, but provides more detail
  294. to the user.
  295. """
  296. raise StreamClosedError(self.stream_id)
  297. def recv_push_on_closed_stream(self, previous_state):
  298. """
  299. Called when a PUSH_PROMISE frame is received on a full stop
  300. stream.
  301. If the stream was closed by us sending a RST_STREAM frame, then we
  302. presume that the PUSH_PROMISE was in flight when we reset the parent
  303. stream. Rathen than accept the new stream, we just reset it.
  304. Otherwise, we should call this a PROTOCOL_ERROR: pushing a stream on a
  305. naturally closed stream is a real problem because it creates a brand
  306. new stream that the remote peer now believes exists.
  307. """
  308. assert self.stream_closed_by is not None
  309. if self.stream_closed_by == StreamClosedBy.SEND_RST_STREAM:
  310. raise StreamClosedError(self.stream_id)
  311. else:
  312. raise ProtocolError("Attempted to push on closed stream.")
  313. def send_push_on_closed_stream(self, previous_state):
  314. """
  315. Called when an attempt is made to push on an already-closed stream.
  316. This essentially overrides the standard logic by providing a more
  317. useful error message. It's necessary because simply indicating that the
  318. stream is closed is not enough: there is now a new stream that is not
  319. allowed to be there. The only recourse is to tear the whole connection
  320. down.
  321. """
  322. raise ProtocolError("Attempted to push on closed stream.")
  323. def window_on_closed_stream(self, previous_state):
  324. """
  325. Called when a WINDOW_UPDATE frame is received on an already-closed
  326. stream.
  327. If we sent an END_STREAM frame, we just ignore the frame, as instructed
  328. in RFC 7540 Section 5.1. Technically we should eventually consider
  329. WINDOW_UPDATE in this state an error, but we don't have access to a
  330. clock so we just always allow it. If we closed the stream for any other
  331. reason, we behave as we do for receiving any other frame on a closed
  332. stream.
  333. """
  334. assert self.stream_closed_by is not None
  335. if self.stream_closed_by == StreamClosedBy.SEND_END_STREAM:
  336. return []
  337. return self.recv_on_closed_stream(previous_state)
  338. def reset_on_closed_stream(self, previous_state):
  339. """
  340. Called when a RST_STREAM frame is received on an already-closed stream.
  341. If we sent an END_STREAM frame, we just ignore the frame, as instructed
  342. in RFC 7540 Section 5.1. Technically we should eventually consider
  343. RST_STREAM in this state an error, but we don't have access to a clock
  344. so we just always allow it. If we closed the stream for any other
  345. reason, we behave as we do for receiving any other frame on a closed
  346. stream.
  347. """
  348. assert self.stream_closed_by is not None
  349. if self.stream_closed_by is StreamClosedBy.SEND_END_STREAM:
  350. return []
  351. return self.recv_on_closed_stream(previous_state)
  352. def send_informational_response(self, previous_state):
  353. """
  354. Called when an informational header block is sent (that is, a block
  355. where the :status header has a 1XX value).
  356. Only enforces that these are sent *before* final headers are sent.
  357. """
  358. if self.headers_sent:
  359. raise ProtocolError("Information response after final response")
  360. event = _ResponseSent()
  361. return [event]
  362. def recv_informational_response(self, previous_state):
  363. """
  364. Called when an informational header block is received (that is, a block
  365. where the :status header has a 1XX value).
  366. """
  367. if self.headers_received:
  368. raise ProtocolError("Informational response after final response")
  369. event = InformationalResponseReceived()
  370. event.stream_id = self.stream_id
  371. return [event]
  372. def recv_alt_svc(self, previous_state):
  373. """
  374. Called when receiving an ALTSVC frame.
  375. RFC 7838 allows us to receive ALTSVC frames at any stream state, which
  376. is really absurdly overzealous. For that reason, we want to limit the
  377. states in which we can actually receive it. It's really only sensible
  378. to receive it after we've sent our own headers and before the server
  379. has sent its header block: the server can't guarantee that we have any
  380. state around after it completes its header block, and the server
  381. doesn't know what origin we're talking about before we've sent ours.
  382. For that reason, this function applies a few extra checks on both state
  383. and some of the little state variables we keep around. If those suggest
  384. an unreasonable situation for the ALTSVC frame to have been sent in,
  385. we quietly ignore it (as RFC 7838 suggests).
  386. This function is also *not* always called by the state machine. In some
  387. states (IDLE, RESERVED_LOCAL, CLOSED) we don't bother to call it,
  388. because we know the frame cannot be valid in that state (IDLE because
  389. the server cannot know what origin the stream applies to, CLOSED
  390. because the server cannot assume we still have state around,
  391. RESERVED_LOCAL because by definition if we're in the RESERVED_LOCAL
  392. state then *we* are the server).
  393. """
  394. # Servers can't receive ALTSVC frames, but RFC 7838 tells us to ignore
  395. # them.
  396. if self.client is False:
  397. return []
  398. # If we've received the response headers from the server they can't
  399. # guarantee we still have any state around. Other implementations
  400. # (like nghttp2) ignore ALTSVC in this state, so we will too.
  401. if self.headers_received:
  402. return []
  403. # Otherwise, this is a sensible enough frame to have received. Return
  404. # the event and let it get populated.
  405. return [AlternativeServiceAvailable()]
  406. def send_alt_svc(self, previous_state):
  407. """
  408. Called when sending an ALTSVC frame on this stream.
  409. For consistency with the restrictions we apply on receiving ALTSVC
  410. frames in ``recv_alt_svc``, we want to restrict when users can send
  411. ALTSVC frames to the situations when we ourselves would accept them.
  412. That means: when we are a server, when we have received the request
  413. headers, and when we have not yet sent our own response headers.
  414. """
  415. # We should not send ALTSVC after we've sent response headers, as the
  416. # client may have disposed of its state.
  417. if self.headers_sent:
  418. raise ProtocolError(
  419. "Cannot send ALTSVC after sending response headers."
  420. )
  421. return
  422. # STATE MACHINE
  423. #
  424. # The stream state machine is defined here to avoid the need to allocate it
  425. # repeatedly for each stream. It cannot be defined in the stream class because
  426. # it needs to be able to reference the callbacks defined on the class, but
  427. # because Python's scoping rules are weird the class object is not actually in
  428. # scope during the body of the class object.
  429. #
  430. # For the sake of clarity, we reproduce the RFC 7540 state machine here:
  431. #
  432. # +--------+
  433. # send PP | | recv PP
  434. # ,--------| idle |--------.
  435. # / | | \
  436. # v +--------+ v
  437. # +----------+ | +----------+
  438. # | | | send H / | |
  439. # ,------| reserved | | recv H | reserved |------.
  440. # | | (local) | | | (remote) | |
  441. # | +----------+ v +----------+ |
  442. # | | +--------+ | |
  443. # | | recv ES | | send ES | |
  444. # | send H | ,-------| open |-------. | recv H |
  445. # | | / | | \ | |
  446. # | v v +--------+ v v |
  447. # | +----------+ | +----------+ |
  448. # | | half | | | half | |
  449. # | | closed | | send R / | closed | |
  450. # | | (remote) | | recv R | (local) | |
  451. # | +----------+ | +----------+ |
  452. # | | | | |
  453. # | | send ES / | recv ES / | |
  454. # | | send R / v send R / | |
  455. # | | recv R +--------+ recv R | |
  456. # | send R / `----------->| |<-----------' send R / |
  457. # | recv R | closed | recv R |
  458. # `----------------------->| |<----------------------'
  459. # +--------+
  460. #
  461. # send: endpoint sends this frame
  462. # recv: endpoint receives this frame
  463. #
  464. # H: HEADERS frame (with implied CONTINUATIONs)
  465. # PP: PUSH_PROMISE frame (with implied CONTINUATIONs)
  466. # ES: END_STREAM flag
  467. # R: RST_STREAM frame
  468. #
  469. # For the purposes of this state machine we treat HEADERS and their
  470. # associated CONTINUATION frames as a single jumbo frame. The protocol
  471. # allows/requires this by preventing other frames from being interleved in
  472. # between HEADERS/CONTINUATION frames. However, if a CONTINUATION frame is
  473. # received without a prior HEADERS frame, it *will* be passed to this state
  474. # machine. The state machine should always reject that frame, either as an
  475. # invalid transition or because the stream is closed.
  476. #
  477. # There is a confusing relationship around PUSH_PROMISE frames. The state
  478. # machine above considers them to be frames belonging to the new stream,
  479. # which is *somewhat* true. However, they are sent with the stream ID of
  480. # their related stream, and are only sendable in some cases.
  481. # For this reason, our state machine implementation below allows for
  482. # PUSH_PROMISE frames both in the IDLE state (as in the diagram), but also
  483. # in the OPEN, HALF_CLOSED_LOCAL, and HALF_CLOSED_REMOTE states.
  484. # Essentially, for hyper-h2, PUSH_PROMISE frames are effectively sent on
  485. # two streams.
  486. #
  487. # The _transitions dictionary contains a mapping of tuples of
  488. # (state, input) to tuples of (side_effect_function, end_state). This
  489. # map contains all allowed transitions: anything not in this map is
  490. # invalid and immediately causes a transition to ``closed``.
  491. _transitions = {
  492. # State: idle
  493. (StreamState.IDLE, StreamInputs.SEND_HEADERS):
  494. (H2StreamStateMachine.request_sent, StreamState.OPEN),
  495. (StreamState.IDLE, StreamInputs.RECV_HEADERS):
  496. (H2StreamStateMachine.request_received, StreamState.OPEN),
  497. (StreamState.IDLE, StreamInputs.RECV_DATA):
  498. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  499. (StreamState.IDLE, StreamInputs.SEND_PUSH_PROMISE):
  500. (H2StreamStateMachine.send_new_pushed_stream,
  501. StreamState.RESERVED_LOCAL),
  502. (StreamState.IDLE, StreamInputs.RECV_PUSH_PROMISE):
  503. (H2StreamStateMachine.recv_new_pushed_stream,
  504. StreamState.RESERVED_REMOTE),
  505. (StreamState.IDLE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  506. (None, StreamState.IDLE),
  507. (StreamState.IDLE, StreamInputs.UPGRADE_CLIENT):
  508. (H2StreamStateMachine.request_sent, StreamState.HALF_CLOSED_LOCAL),
  509. (StreamState.IDLE, StreamInputs.UPGRADE_SERVER):
  510. (H2StreamStateMachine.request_received,
  511. StreamState.HALF_CLOSED_REMOTE),
  512. # State: reserved local
  513. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_HEADERS):
  514. (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
  515. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_DATA):
  516. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  517. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
  518. (None, StreamState.RESERVED_LOCAL),
  519. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
  520. (H2StreamStateMachine.window_updated, StreamState.RESERVED_LOCAL),
  521. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_RST_STREAM):
  522. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  523. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_RST_STREAM):
  524. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  525. (StreamState.RESERVED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  526. (H2StreamStateMachine.send_alt_svc, StreamState.RESERVED_LOCAL),
  527. (StreamState.RESERVED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  528. (None, StreamState.RESERVED_LOCAL),
  529. # State: reserved remote
  530. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_HEADERS):
  531. (H2StreamStateMachine.response_received,
  532. StreamState.HALF_CLOSED_LOCAL),
  533. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_DATA):
  534. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  535. (StreamState.RESERVED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
  536. (None, StreamState.RESERVED_REMOTE),
  537. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
  538. (H2StreamStateMachine.window_updated, StreamState.RESERVED_REMOTE),
  539. (StreamState.RESERVED_REMOTE, StreamInputs.SEND_RST_STREAM):
  540. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  541. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_RST_STREAM):
  542. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  543. (StreamState.RESERVED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  544. (H2StreamStateMachine.recv_alt_svc, StreamState.RESERVED_REMOTE),
  545. # State: open
  546. (StreamState.OPEN, StreamInputs.SEND_HEADERS):
  547. (H2StreamStateMachine.response_sent, StreamState.OPEN),
  548. (StreamState.OPEN, StreamInputs.RECV_HEADERS):
  549. (H2StreamStateMachine.response_received, StreamState.OPEN),
  550. (StreamState.OPEN, StreamInputs.SEND_DATA):
  551. (None, StreamState.OPEN),
  552. (StreamState.OPEN, StreamInputs.RECV_DATA):
  553. (H2StreamStateMachine.data_received, StreamState.OPEN),
  554. (StreamState.OPEN, StreamInputs.SEND_END_STREAM):
  555. (None, StreamState.HALF_CLOSED_LOCAL),
  556. (StreamState.OPEN, StreamInputs.RECV_END_STREAM):
  557. (H2StreamStateMachine.stream_half_closed,
  558. StreamState.HALF_CLOSED_REMOTE),
  559. (StreamState.OPEN, StreamInputs.SEND_WINDOW_UPDATE):
  560. (None, StreamState.OPEN),
  561. (StreamState.OPEN, StreamInputs.RECV_WINDOW_UPDATE):
  562. (H2StreamStateMachine.window_updated, StreamState.OPEN),
  563. (StreamState.OPEN, StreamInputs.SEND_RST_STREAM):
  564. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  565. (StreamState.OPEN, StreamInputs.RECV_RST_STREAM):
  566. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  567. (StreamState.OPEN, StreamInputs.SEND_PUSH_PROMISE):
  568. (H2StreamStateMachine.send_push_promise, StreamState.OPEN),
  569. (StreamState.OPEN, StreamInputs.RECV_PUSH_PROMISE):
  570. (H2StreamStateMachine.recv_push_promise, StreamState.OPEN),
  571. (StreamState.OPEN, StreamInputs.SEND_INFORMATIONAL_HEADERS):
  572. (H2StreamStateMachine.send_informational_response, StreamState.OPEN),
  573. (StreamState.OPEN, StreamInputs.RECV_INFORMATIONAL_HEADERS):
  574. (H2StreamStateMachine.recv_informational_response, StreamState.OPEN),
  575. (StreamState.OPEN, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  576. (H2StreamStateMachine.send_alt_svc, StreamState.OPEN),
  577. (StreamState.OPEN, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  578. (H2StreamStateMachine.recv_alt_svc, StreamState.OPEN),
  579. # State: half-closed remote
  580. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_HEADERS):
  581. (H2StreamStateMachine.response_sent, StreamState.HALF_CLOSED_REMOTE),
  582. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_HEADERS):
  583. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  584. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_DATA):
  585. (None, StreamState.HALF_CLOSED_REMOTE),
  586. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_DATA):
  587. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  588. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_END_STREAM):
  589. (H2StreamStateMachine.send_end_stream, StreamState.CLOSED),
  590. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_WINDOW_UPDATE):
  591. (None, StreamState.HALF_CLOSED_REMOTE),
  592. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_WINDOW_UPDATE):
  593. (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_REMOTE),
  594. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_RST_STREAM):
  595. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  596. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_RST_STREAM):
  597. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  598. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_PUSH_PROMISE):
  599. (H2StreamStateMachine.send_push_promise,
  600. StreamState.HALF_CLOSED_REMOTE),
  601. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_PUSH_PROMISE):
  602. (H2StreamStateMachine.reset_stream_on_error, StreamState.CLOSED),
  603. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_INFORMATIONAL_HEADERS):
  604. (H2StreamStateMachine.send_informational_response,
  605. StreamState.HALF_CLOSED_REMOTE),
  606. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  607. (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_REMOTE),
  608. (StreamState.HALF_CLOSED_REMOTE, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  609. (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_REMOTE),
  610. # State: half-closed local
  611. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_HEADERS):
  612. (H2StreamStateMachine.response_received,
  613. StreamState.HALF_CLOSED_LOCAL),
  614. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_DATA):
  615. (H2StreamStateMachine.data_received, StreamState.HALF_CLOSED_LOCAL),
  616. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_END_STREAM):
  617. (H2StreamStateMachine.stream_ended, StreamState.CLOSED),
  618. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_WINDOW_UPDATE):
  619. (None, StreamState.HALF_CLOSED_LOCAL),
  620. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_WINDOW_UPDATE):
  621. (H2StreamStateMachine.window_updated, StreamState.HALF_CLOSED_LOCAL),
  622. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_RST_STREAM):
  623. (H2StreamStateMachine.send_reset_stream, StreamState.CLOSED),
  624. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_RST_STREAM):
  625. (H2StreamStateMachine.stream_reset, StreamState.CLOSED),
  626. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_PUSH_PROMISE):
  627. (H2StreamStateMachine.recv_push_promise,
  628. StreamState.HALF_CLOSED_LOCAL),
  629. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_INFORMATIONAL_HEADERS):
  630. (H2StreamStateMachine.recv_informational_response,
  631. StreamState.HALF_CLOSED_LOCAL),
  632. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.SEND_ALTERNATIVE_SERVICE):
  633. (H2StreamStateMachine.send_alt_svc, StreamState.HALF_CLOSED_LOCAL),
  634. (StreamState.HALF_CLOSED_LOCAL, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  635. (H2StreamStateMachine.recv_alt_svc, StreamState.HALF_CLOSED_LOCAL),
  636. # State: closed
  637. (StreamState.CLOSED, StreamInputs.RECV_END_STREAM):
  638. (None, StreamState.CLOSED),
  639. (StreamState.CLOSED, StreamInputs.RECV_ALTERNATIVE_SERVICE):
  640. (None, StreamState.CLOSED),
  641. # RFC 7540 Section 5.1 defines how the end point should react when
  642. # receiving a frame on a closed stream with the following statements:
  643. #
  644. # > An endpoint that receives any frame other than PRIORITY after receiving
  645. # > a RST_STREAM MUST treat that as a stream error of type STREAM_CLOSED.
  646. # > An endpoint that receives any frames after receiving a frame with the
  647. # > END_STREAM flag set MUST treat that as a connection error of type
  648. # > STREAM_CLOSED.
  649. (StreamState.CLOSED, StreamInputs.RECV_HEADERS):
  650. (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
  651. (StreamState.CLOSED, StreamInputs.RECV_DATA):
  652. (H2StreamStateMachine.recv_on_closed_stream, StreamState.CLOSED),
  653. # > WINDOW_UPDATE or RST_STREAM frames can be received in this state
  654. # > for a short period after a DATA or HEADERS frame containing a
  655. # > END_STREAM flag is sent.
  656. (StreamState.CLOSED, StreamInputs.RECV_WINDOW_UPDATE):
  657. (H2StreamStateMachine.window_on_closed_stream, StreamState.CLOSED),
  658. (StreamState.CLOSED, StreamInputs.RECV_RST_STREAM):
  659. (H2StreamStateMachine.reset_on_closed_stream, StreamState.CLOSED),
  660. # > A receiver MUST treat the receipt of a PUSH_PROMISE on a stream that is
  661. # > neither "open" nor "half-closed (local)" as a connection error of type
  662. # > PROTOCOL_ERROR.
  663. (StreamState.CLOSED, StreamInputs.RECV_PUSH_PROMISE):
  664. (H2StreamStateMachine.recv_push_on_closed_stream, StreamState.CLOSED),
  665. # Also, users should be forbidden from sending on closed streams.
  666. (StreamState.CLOSED, StreamInputs.SEND_HEADERS):
  667. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  668. (StreamState.CLOSED, StreamInputs.SEND_PUSH_PROMISE):
  669. (H2StreamStateMachine.send_push_on_closed_stream, StreamState.CLOSED),
  670. (StreamState.CLOSED, StreamInputs.SEND_RST_STREAM):
  671. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  672. (StreamState.CLOSED, StreamInputs.SEND_DATA):
  673. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  674. (StreamState.CLOSED, StreamInputs.SEND_WINDOW_UPDATE):
  675. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  676. (StreamState.CLOSED, StreamInputs.SEND_END_STREAM):
  677. (H2StreamStateMachine.send_on_closed_stream, StreamState.CLOSED),
  678. }
  679. class H2Stream(object):
  680. """
  681. A low-level HTTP/2 stream object. This handles building and receiving
  682. frames and maintains per-stream state.
  683. This wraps a HTTP/2 Stream state machine implementation, ensuring that
  684. frames can only be sent/received when the stream is in a valid state.
  685. Attempts to create frames that cannot be sent will raise a
  686. ``ProtocolError``.
  687. """
  688. def __init__(self,
  689. stream_id,
  690. config,
  691. inbound_window_size,
  692. outbound_window_size):
  693. self.state_machine = H2StreamStateMachine(stream_id)
  694. self.stream_id = stream_id
  695. self.max_outbound_frame_size = None
  696. self.request_method = None
  697. # The curent value of the outbound stream flow control window
  698. self.outbound_flow_control_window = outbound_window_size
  699. # The flow control manager.
  700. self._inbound_window_manager = WindowManager(inbound_window_size)
  701. # The expected content length, if any.
  702. self._expected_content_length = None
  703. # The actual received content length. Always tracked.
  704. self._actual_content_length = 0
  705. # The authority we believe this stream belongs to.
  706. self._authority = None
  707. # The configuration for this stream.
  708. self.config = config
  709. def __repr__(self):
  710. return "<%s id:%d state:%r>" % (
  711. type(self).__name__,
  712. self.stream_id,
  713. self.state_machine.state
  714. )
  715. @property
  716. def inbound_flow_control_window(self):
  717. """
  718. The size of the inbound flow control window for the stream. This is
  719. rarely publicly useful: instead, use :meth:`remote_flow_control_window
  720. <h2.stream.H2Stream.remote_flow_control_window>`. This shortcut is
  721. largely present to provide a shortcut to this data.
  722. """
  723. return self._inbound_window_manager.current_window_size
  724. @property
  725. def open(self):
  726. """
  727. Whether the stream is 'open' in any sense: that is, whether it counts
  728. against the number of concurrent streams.
  729. """
  730. # RFC 7540 Section 5.1.2 defines 'open' for this purpose to mean either
  731. # the OPEN state or either of the HALF_CLOSED states. Perplexingly,
  732. # this excludes the reserved states.
  733. # For more detail on why we're doing this in this slightly weird way,
  734. # see the comment on ``STREAM_OPEN`` at the top of the file.
  735. return STREAM_OPEN[self.state_machine.state]
  736. @property
  737. def closed(self):
  738. """
  739. Whether the stream is closed.
  740. """
  741. return self.state_machine.state == StreamState.CLOSED
  742. @property
  743. def closed_by(self):
  744. """
  745. Returns how the stream was closed, as one of StreamClosedBy.
  746. """
  747. return self.state_machine.stream_closed_by
  748. def upgrade(self, client_side):
  749. """
  750. Called by the connection to indicate that this stream is the initial
  751. request/response of an upgraded connection. Places the stream into an
  752. appropriate state.
  753. """
  754. self.config.logger.debug("Upgrading %r", self)
  755. assert self.stream_id == 1
  756. input_ = (
  757. StreamInputs.UPGRADE_CLIENT if client_side
  758. else StreamInputs.UPGRADE_SERVER
  759. )
  760. # This may return events, we deliberately don't want them.
  761. self.state_machine.process_input(input_)
  762. return
  763. def send_headers(self, headers, encoder, end_stream=False):
  764. """
  765. Returns a list of HEADERS/CONTINUATION frames to emit as either headers
  766. or trailers.
  767. """
  768. self.config.logger.debug("Send headers %s on %r", headers, self)
  769. # Convert headers to two-tuples.
  770. # FIXME: The fallback for dictionary headers is to be removed in 3.0.
  771. try:
  772. headers = headers.items()
  773. warnings.warn(
  774. "Implicit conversion of dictionaries to two-tuples for "
  775. "headers is deprecated and will be removed in 3.0.",
  776. DeprecationWarning
  777. )
  778. except AttributeError:
  779. headers = headers
  780. # Because encoding headers makes an irreversible change to the header
  781. # compression context, we make the state transition before we encode
  782. # them.
  783. # First, check if we're a client. If we are, no problem: if we aren't,
  784. # we need to scan the header block to see if this is an informational
  785. # response.
  786. input_ = StreamInputs.SEND_HEADERS
  787. if ((not self.state_machine.client) and
  788. is_informational_response(headers)):
  789. if end_stream:
  790. raise ProtocolError(
  791. "Cannot set END_STREAM on informational responses."
  792. )
  793. input_ = StreamInputs.SEND_INFORMATIONAL_HEADERS
  794. events = self.state_machine.process_input(input_)
  795. hf = HeadersFrame(self.stream_id)
  796. hdr_validation_flags = self._build_hdr_validation_flags(events)
  797. frames = self._build_headers_frames(
  798. headers, encoder, hf, hdr_validation_flags
  799. )
  800. if end_stream:
  801. # Not a bug: the END_STREAM flag is valid on the initial HEADERS
  802. # frame, not the CONTINUATION frames that follow.
  803. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  804. frames[0].flags.add('END_STREAM')
  805. if self.state_machine.trailers_sent and not end_stream:
  806. raise ProtocolError("Trailers must have END_STREAM set.")
  807. if self.state_machine.client and self._authority is None:
  808. self._authority = authority_from_headers(headers)
  809. # store request method for _initialize_content_length
  810. self.request_method = extract_method_header(headers)
  811. return frames
  812. def push_stream_in_band(self, related_stream_id, headers, encoder):
  813. """
  814. Returns a list of PUSH_PROMISE/CONTINUATION frames to emit as a pushed
  815. stream header. Called on the stream that has the PUSH_PROMISE frame
  816. sent on it.
  817. """
  818. self.config.logger.debug("Push stream %r", self)
  819. # Because encoding headers makes an irreversible change to the header
  820. # compression context, we make the state transition *first*.
  821. events = self.state_machine.process_input(
  822. StreamInputs.SEND_PUSH_PROMISE
  823. )
  824. ppf = PushPromiseFrame(self.stream_id)
  825. ppf.promised_stream_id = related_stream_id
  826. hdr_validation_flags = self._build_hdr_validation_flags(events)
  827. frames = self._build_headers_frames(
  828. headers, encoder, ppf, hdr_validation_flags
  829. )
  830. return frames
  831. def locally_pushed(self):
  832. """
  833. Mark this stream as one that was pushed by this peer. Must be called
  834. immediately after initialization. Sends no frames, simply updates the
  835. state machine.
  836. """
  837. # This does not trigger any events.
  838. events = self.state_machine.process_input(
  839. StreamInputs.SEND_PUSH_PROMISE
  840. )
  841. assert not events
  842. return []
  843. def send_data(self, data, end_stream=False, pad_length=None):
  844. """
  845. Prepare some data frames. Optionally end the stream.
  846. .. warning:: Does not perform flow control checks.
  847. """
  848. self.config.logger.debug(
  849. "Send data on %r with end stream set to %s", self, end_stream
  850. )
  851. self.state_machine.process_input(StreamInputs.SEND_DATA)
  852. df = DataFrame(self.stream_id)
  853. df.data = data
  854. if end_stream:
  855. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  856. df.flags.add('END_STREAM')
  857. if pad_length is not None:
  858. df.flags.add('PADDED')
  859. df.pad_length = pad_length
  860. # Subtract flow_controlled_length to account for possible padding
  861. self.outbound_flow_control_window -= df.flow_controlled_length
  862. assert self.outbound_flow_control_window >= 0
  863. return [df]
  864. def end_stream(self):
  865. """
  866. End a stream without sending data.
  867. """
  868. self.config.logger.debug("End stream %r", self)
  869. self.state_machine.process_input(StreamInputs.SEND_END_STREAM)
  870. df = DataFrame(self.stream_id)
  871. df.flags.add('END_STREAM')
  872. return [df]
  873. def advertise_alternative_service(self, field_value):
  874. """
  875. Advertise an RFC 7838 alternative service. The semantics of this are
  876. better documented in the ``H2Connection`` class.
  877. """
  878. self.config.logger.debug(
  879. "Advertise alternative service of %r for %r", field_value, self
  880. )
  881. self.state_machine.process_input(StreamInputs.SEND_ALTERNATIVE_SERVICE)
  882. asf = AltSvcFrame(self.stream_id)
  883. asf.field = field_value
  884. return [asf]
  885. def increase_flow_control_window(self, increment):
  886. """
  887. Increase the size of the flow control window for the remote side.
  888. """
  889. self.config.logger.debug(
  890. "Increase flow control window for %r by %d",
  891. self, increment
  892. )
  893. self.state_machine.process_input(StreamInputs.SEND_WINDOW_UPDATE)
  894. self._inbound_window_manager.window_opened(increment)
  895. wuf = WindowUpdateFrame(self.stream_id)
  896. wuf.window_increment = increment
  897. return [wuf]
  898. def receive_push_promise_in_band(self,
  899. promised_stream_id,
  900. headers,
  901. header_encoding):
  902. """
  903. Receives a push promise frame sent on this stream, pushing a remote
  904. stream. This is called on the stream that has the PUSH_PROMISE sent
  905. on it.
  906. """
  907. self.config.logger.debug(
  908. "Receive Push Promise on %r for remote stream %d",
  909. self, promised_stream_id
  910. )
  911. events = self.state_machine.process_input(
  912. StreamInputs.RECV_PUSH_PROMISE
  913. )
  914. events[0].pushed_stream_id = promised_stream_id
  915. if self.config.validate_inbound_headers:
  916. hdr_validation_flags = self._build_hdr_validation_flags(events)
  917. headers = validate_headers(headers, hdr_validation_flags)
  918. if header_encoding:
  919. headers = list(_decode_headers(headers, header_encoding))
  920. events[0].headers = headers
  921. return [], events
  922. def remotely_pushed(self, pushed_headers):
  923. """
  924. Mark this stream as one that was pushed by the remote peer. Must be
  925. called immediately after initialization. Sends no frames, simply
  926. updates the state machine.
  927. """
  928. self.config.logger.debug("%r pushed by remote peer", self)
  929. events = self.state_machine.process_input(
  930. StreamInputs.RECV_PUSH_PROMISE
  931. )
  932. self._authority = authority_from_headers(pushed_headers)
  933. return [], events
  934. def receive_headers(self, headers, end_stream, header_encoding):
  935. """
  936. Receive a set of headers (or trailers).
  937. """
  938. if is_informational_response(headers):
  939. if end_stream:
  940. raise ProtocolError(
  941. "Cannot set END_STREAM on informational responses"
  942. )
  943. input_ = StreamInputs.RECV_INFORMATIONAL_HEADERS
  944. else:
  945. input_ = StreamInputs.RECV_HEADERS
  946. events = self.state_machine.process_input(input_)
  947. if end_stream:
  948. es_events = self.state_machine.process_input(
  949. StreamInputs.RECV_END_STREAM
  950. )
  951. events[0].stream_ended = es_events[0]
  952. events += es_events
  953. self._initialize_content_length(headers)
  954. if isinstance(events[0], TrailersReceived):
  955. if not end_stream:
  956. raise ProtocolError("Trailers must have END_STREAM set")
  957. if self.config.validate_inbound_headers:
  958. hdr_validation_flags = self._build_hdr_validation_flags(events)
  959. headers = validate_headers(headers, hdr_validation_flags)
  960. if header_encoding:
  961. headers = list(_decode_headers(headers, header_encoding))
  962. events[0].headers = headers
  963. return [], events
  964. def receive_data(self, data, end_stream, flow_control_len):
  965. """
  966. Receive some data.
  967. """
  968. self.config.logger.debug(
  969. "Receive data on %r with end stream %s and flow control length "
  970. "set to %d", self, end_stream, flow_control_len
  971. )
  972. events = self.state_machine.process_input(StreamInputs.RECV_DATA)
  973. self._inbound_window_manager.window_consumed(flow_control_len)
  974. self._track_content_length(len(data), end_stream)
  975. if end_stream:
  976. es_events = self.state_machine.process_input(
  977. StreamInputs.RECV_END_STREAM
  978. )
  979. events[0].stream_ended = es_events[0]
  980. events.extend(es_events)
  981. events[0].data = data
  982. events[0].flow_controlled_length = flow_control_len
  983. return [], events
  984. def receive_window_update(self, increment):
  985. """
  986. Handle a WINDOW_UPDATE increment.
  987. """
  988. self.config.logger.debug(
  989. "Receive Window Update on %r for increment of %d",
  990. self, increment
  991. )
  992. events = self.state_machine.process_input(
  993. StreamInputs.RECV_WINDOW_UPDATE
  994. )
  995. frames = []
  996. # If we encounter a problem with incrementing the flow control window,
  997. # this should be treated as a *stream* error, not a *connection* error.
  998. # That means we need to catch the error and forcibly close the stream.
  999. if events:
  1000. events[0].delta = increment
  1001. try:
  1002. self.outbound_flow_control_window = guard_increment_window(
  1003. self.outbound_flow_control_window,
  1004. increment
  1005. )
  1006. except FlowControlError:
  1007. # Ok, this is bad. We're going to need to perform a local
  1008. # reset.
  1009. event = StreamReset()
  1010. event.stream_id = self.stream_id
  1011. event.error_code = ErrorCodes.FLOW_CONTROL_ERROR
  1012. event.remote_reset = False
  1013. events = [event]
  1014. frames = self.reset_stream(event.error_code)
  1015. return frames, events
  1016. def receive_continuation(self):
  1017. """
  1018. A naked CONTINUATION frame has been received. This is always an error,
  1019. but the type of error it is depends on the state of the stream and must
  1020. transition the state of the stream, so we need to handle it.
  1021. """
  1022. self.config.logger.debug("Receive Continuation frame on %r", self)
  1023. self.state_machine.process_input(
  1024. StreamInputs.RECV_CONTINUATION
  1025. )
  1026. assert False, "Should not be reachable"
  1027. def receive_alt_svc(self, frame):
  1028. """
  1029. An Alternative Service frame was received on the stream. This frame
  1030. inherits the origin associated with this stream.
  1031. """
  1032. self.config.logger.debug(
  1033. "Receive Alternative Service frame on stream %r", self
  1034. )
  1035. # If the origin is present, RFC 7838 says we have to ignore it.
  1036. if frame.origin:
  1037. return [], []
  1038. events = self.state_machine.process_input(
  1039. StreamInputs.RECV_ALTERNATIVE_SERVICE
  1040. )
  1041. # There are lots of situations where we want to ignore the ALTSVC
  1042. # frame. If we need to pay attention, we'll have an event and should
  1043. # fill it out.
  1044. if events:
  1045. assert isinstance(events[0], AlternativeServiceAvailable)
  1046. events[0].origin = self._authority
  1047. events[0].field_value = frame.field
  1048. return [], events
  1049. def reset_stream(self, error_code=0):
  1050. """
  1051. Close the stream locally. Reset the stream with an error code.
  1052. """
  1053. self.config.logger.debug(
  1054. "Local reset %r with error code: %d", self, error_code
  1055. )
  1056. self.state_machine.process_input(StreamInputs.SEND_RST_STREAM)
  1057. rsf = RstStreamFrame(self.stream_id)
  1058. rsf.error_code = error_code
  1059. return [rsf]
  1060. def stream_reset(self, frame):
  1061. """
  1062. Handle a stream being reset remotely.
  1063. """
  1064. self.config.logger.debug(
  1065. "Remote reset %r with error code: %d", self, frame.error_code
  1066. )
  1067. events = self.state_machine.process_input(StreamInputs.RECV_RST_STREAM)
  1068. if events:
  1069. # We don't fire an event if this stream is already closed.
  1070. events[0].error_code = _error_code_from_int(frame.error_code)
  1071. return [], events
  1072. def acknowledge_received_data(self, acknowledged_size):
  1073. """
  1074. The user has informed us that they've processed some amount of data
  1075. that was received on this stream. Pass that to the window manager and
  1076. potentially return some WindowUpdate frames.
  1077. """
  1078. self.config.logger.debug(
  1079. "Acknowledge received data with size %d on %r",
  1080. acknowledged_size, self
  1081. )
  1082. increment = self._inbound_window_manager.process_bytes(
  1083. acknowledged_size
  1084. )
  1085. if increment:
  1086. f = WindowUpdateFrame(self.stream_id)
  1087. f.window_increment = increment
  1088. return [f]
  1089. return []
  1090. def _build_hdr_validation_flags(self, events):
  1091. """
  1092. Constructs a set of header validation flags for use when normalizing
  1093. and validating header blocks.
  1094. """
  1095. is_trailer = isinstance(
  1096. events[0], (_TrailersSent, TrailersReceived)
  1097. )
  1098. is_response_header = isinstance(
  1099. events[0],
  1100. (
  1101. _ResponseSent,
  1102. ResponseReceived,
  1103. InformationalResponseReceived
  1104. )
  1105. )
  1106. is_push_promise = isinstance(
  1107. events[0], (PushedStreamReceived, _PushedRequestSent)
  1108. )
  1109. return HeaderValidationFlags(
  1110. is_client=self.state_machine.client,
  1111. is_trailer=is_trailer,
  1112. is_response_header=is_response_header,
  1113. is_push_promise=is_push_promise,
  1114. )
  1115. def _build_headers_frames(self,
  1116. headers,
  1117. encoder,
  1118. first_frame,
  1119. hdr_validation_flags):
  1120. """
  1121. Helper method to build headers or push promise frames.
  1122. """
  1123. # We need to lowercase the header names, and to ensure that secure
  1124. # header fields are kept out of compression contexts.
  1125. if self.config.normalize_outbound_headers:
  1126. headers = normalize_outbound_headers(
  1127. headers, hdr_validation_flags
  1128. )
  1129. if self.config.validate_outbound_headers:
  1130. headers = validate_outbound_headers(
  1131. headers, hdr_validation_flags
  1132. )
  1133. encoded_headers = encoder.encode(headers)
  1134. # Slice into blocks of max_outbound_frame_size. Be careful with this:
  1135. # it only works right because we never send padded frames or priority
  1136. # information on the frames. Revisit this if we do.
  1137. header_blocks = [
  1138. encoded_headers[i:i+self.max_outbound_frame_size]
  1139. for i in range(
  1140. 0, len(encoded_headers), self.max_outbound_frame_size
  1141. )
  1142. ]
  1143. frames = []
  1144. first_frame.data = header_blocks[0]
  1145. frames.append(first_frame)
  1146. for block in header_blocks[1:]:
  1147. cf = ContinuationFrame(self.stream_id)
  1148. cf.data = block
  1149. frames.append(cf)
  1150. frames[-1].flags.add('END_HEADERS')
  1151. return frames
  1152. def _initialize_content_length(self, headers):
  1153. """
  1154. Checks the headers for a content-length header and initializes the
  1155. _expected_content_length field from it. It's not an error for no
  1156. Content-Length header to be present.
  1157. """
  1158. if self.request_method == b'HEAD':
  1159. self._expected_content_length = 0
  1160. return
  1161. for n, v in headers:
  1162. if n == b'content-length':
  1163. try:
  1164. self._expected_content_length = int(v, 10)
  1165. except ValueError:
  1166. raise ProtocolError(
  1167. "Invalid content-length header: %s" % v
  1168. )
  1169. return
  1170. def _track_content_length(self, length, end_stream):
  1171. """
  1172. Update the expected content length in response to data being received.
  1173. Validates that the appropriate amount of data is sent. Always updates
  1174. the received data, but only validates the length against the
  1175. content-length header if one was sent.
  1176. :param length: The length of the body chunk received.
  1177. :param end_stream: If this is the last body chunk received.
  1178. """
  1179. self._actual_content_length += length
  1180. actual = self._actual_content_length
  1181. expected = self._expected_content_length
  1182. if expected is not None:
  1183. if expected < actual:
  1184. raise InvalidBodyLengthError(expected, actual)
  1185. if end_stream and expected != actual:
  1186. raise InvalidBodyLengthError(expected, actual)
  1187. def _inbound_flow_control_change_from_settings(self, delta):
  1188. """
  1189. We changed SETTINGS_INITIAL_WINDOW_SIZE, which means we need to
  1190. update the target window size for flow control. For our flow control
  1191. strategy, this means we need to do two things: we need to adjust the
  1192. current window size, but we also need to set the target maximum window
  1193. size to the new value.
  1194. """
  1195. new_max_size = self._inbound_window_manager.max_window_size + delta
  1196. self._inbound_window_manager.window_opened(delta)
  1197. self._inbound_window_manager.max_window_size = new_max_size
  1198. def _decode_headers(headers, encoding):
  1199. """
  1200. Given an iterable of header two-tuples and an encoding, decodes those
  1201. headers using that encoding while preserving the type of the header tuple.
  1202. This ensures that the use of ``HeaderTuple`` is preserved.
  1203. """
  1204. for header in headers:
  1205. # This function expects to work on decoded headers, which are always
  1206. # HeaderTuple objects.
  1207. assert isinstance(header, HeaderTuple)
  1208. name, value = header
  1209. name = name.decode(encoding)
  1210. value = value.decode(encoding)
  1211. yield header.__class__(name, value)