stream.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. # -*- coding: utf-8 -*-
  2. """
  3. hyper/http20/stream
  4. ~~~~~~~~~~~~~~~~~~~
  5. Objects that make up the stream-level abstraction of hyper's HTTP/2 support.
  6. These objects are not expected to be part of the public HTTP/2 API: they're
  7. intended purely for use inside hyper's HTTP/2 abstraction.
  8. Conceptually, a single HTTP/2 connection is made up of many streams: each
  9. stream is an independent, bi-directional sequence of HTTP headers and data.
  10. Each stream is identified by a monotonically increasing integer, assigned to
  11. the stream by the endpoint that initiated the stream.
  12. """
  13. import h2.exceptions
  14. from ..common.headers import HTTPHeaderMap
  15. from .util import h2_safe_headers
  16. import logging
  17. log = logging.getLogger(__name__)
  18. # Define the largest chunk of data we'll send in one go. Realistically, we
  19. # should take the MSS into account but that's pretty dull, so let's just say
  20. # 1kB and call it a day.
  21. MAX_CHUNK = 1024
  22. class Stream(object):
  23. """
  24. A single HTTP/2 stream.
  25. A stream is an independent, bi-directional sequence of HTTP headers and
  26. data. Each stream is identified by a single integer. From a HTTP
  27. perspective, a stream _approximately_ matches a single request-response
  28. pair.
  29. """
  30. def __init__(self,
  31. stream_id,
  32. window_manager,
  33. connection,
  34. send_outstanding_data,
  35. recv_cb,
  36. close_cb):
  37. self.stream_id = stream_id
  38. self.headers = HTTPHeaderMap()
  39. # Set to a key-value set of the response headers once their
  40. # HEADERS..CONTINUATION frame sequence finishes.
  41. self.response_headers = None
  42. # Set to a key-value set of the response trailers once their
  43. # HEADERS..CONTINUATION frame sequence finishes.
  44. self.response_trailers = None
  45. # A dict mapping the promised stream ID of a pushed resource to a
  46. # key-value set of its request headers. Entries are added once their
  47. # PUSH_PROMISE..CONTINUATION frame sequence finishes.
  48. self.promised_headers = {}
  49. # Unconsumed response data chunks. Empties after every call to _read().
  50. self.data = []
  51. # Whether the remote side has completed the stream.
  52. self.remote_closed = False
  53. # Whether we have closed the stream.
  54. self.local_closed = False
  55. # There are two flow control windows: one for data we're sending,
  56. # one for data being sent to us.
  57. self._in_window_manager = window_manager
  58. # Save off a reference to the state machine wrapped with lock.
  59. self._conn = connection
  60. # Save off a data callback.
  61. self._send_outstanding_data = send_outstanding_data
  62. self._recv_cb = recv_cb
  63. self._close_cb = close_cb
  64. def add_header(self, name, value, replace=False):
  65. """
  66. Adds a single HTTP header to the headers to be sent on the request.
  67. """
  68. if not replace:
  69. self.headers[name] = value
  70. else:
  71. self.headers.replace(name, value)
  72. def send_headers(self, end_stream=False):
  73. """
  74. Sends the complete saved header block on the stream.
  75. """
  76. headers = self.get_headers()
  77. with self._conn as conn:
  78. conn.send_headers(self.stream_id, headers, end_stream)
  79. self._send_outstanding_data()
  80. if end_stream:
  81. self.local_closed = True
  82. def send_data(self, data, final):
  83. """
  84. Send some data on the stream. If this is the end of the data to be
  85. sent, the ``final`` flag _must_ be set to True. If no data is to be
  86. sent, set ``data`` to ``None``.
  87. """
  88. # Define a utility iterator for file objects.
  89. def file_iterator(fobj):
  90. while True:
  91. data = fobj.read(MAX_CHUNK)
  92. yield data
  93. if len(data) < MAX_CHUNK:
  94. break
  95. # Build the appropriate iterator for the data, in chunks of CHUNK_SIZE.
  96. if hasattr(data, 'read'):
  97. chunks = file_iterator(data)
  98. else:
  99. chunks = (data[i:i+MAX_CHUNK]
  100. for i in range(0, len(data), MAX_CHUNK))
  101. for chunk in chunks:
  102. self._send_chunk(chunk, final)
  103. def _read(self, amt=None):
  104. """
  105. Read data from the stream. Unlike a normal read behaviour, this
  106. function returns _at least_ ``amt`` data, but may return more.
  107. """
  108. def listlen(list):
  109. return sum(map(len, list))
  110. # Keep reading until the stream is closed or we get enough data.
  111. while (not self.remote_closed and
  112. (amt is None or listlen(self.data) < amt)):
  113. self._recv_cb(stream_id=self.stream_id)
  114. result = b''.join(self.data)
  115. self.data = []
  116. return result
  117. def _read_one_frame(self):
  118. """
  119. Reads a single data frame from the stream and returns it.
  120. """
  121. # Keep reading until the stream is closed or we have a data frame.
  122. while not self.remote_closed and not self.data:
  123. self._recv_cb(stream_id=self.stream_id)
  124. try:
  125. return self.data.pop(0)
  126. except IndexError:
  127. return None
  128. def receive_response(self, event):
  129. """
  130. Receive response headers.
  131. """
  132. # TODO: If this is called while we're still sending data, we may want
  133. # to stop sending that data and check the response. Early responses to
  134. # big uploads are almost always a problem.
  135. self.response_headers = HTTPHeaderMap(event.headers)
  136. def receive_trailers(self, event):
  137. """
  138. Receive response trailers.
  139. """
  140. self.response_trailers = HTTPHeaderMap(event.headers)
  141. def receive_push(self, event):
  142. """
  143. Receive the request headers for a pushed stream.
  144. """
  145. self.promised_headers[event.pushed_stream_id] = event.headers
  146. def receive_data(self, event):
  147. """
  148. Receive a chunk of data.
  149. """
  150. size = event.flow_controlled_length
  151. increment = self._in_window_manager._handle_frame(size)
  152. # Append the data to the buffer.
  153. self.data.append(event.data)
  154. if increment:
  155. try:
  156. with self._conn as conn:
  157. conn.increment_flow_control_window(
  158. increment, stream_id=self.stream_id
  159. )
  160. except h2.exceptions.StreamClosedError:
  161. # We haven't got to it yet, but the stream is already
  162. # closed. We don't need to increment the window in this
  163. # case!
  164. pass
  165. else:
  166. self._send_outstanding_data()
  167. def receive_end_stream(self, event):
  168. """
  169. All of the data is returned now.
  170. """
  171. self.remote_closed = True
  172. def receive_reset(self, event):
  173. """
  174. Stream forcefully reset.
  175. """
  176. self.remote_closed = True
  177. self._close_cb(self.stream_id)
  178. def get_headers(self):
  179. """
  180. Provides the headers to the connection object.
  181. """
  182. # Strip any headers invalid in H2.
  183. return h2_safe_headers(self.headers)
  184. def getheaders(self):
  185. """
  186. Once all data has been sent on this connection, returns a key-value set
  187. of the headers of the response to the original request.
  188. """
  189. # Keep reading until all headers are received.
  190. while self.response_headers is None:
  191. self._recv_cb(stream_id=self.stream_id)
  192. # Find the Content-Length header if present.
  193. self._in_window_manager.document_size = (
  194. int(self.response_headers.get(b'content-length', [0])[0])
  195. )
  196. return self.response_headers
  197. def gettrailers(self):
  198. """
  199. Once all data has been sent on this connection, returns a key-value set
  200. of the trailers of the response to the original request.
  201. .. warning:: Note that this method requires that the stream is
  202. totally exhausted. This means that, if you have not
  203. completely read from the stream, all stream data will be
  204. read into memory.
  205. :returns: The key-value set of the trailers, or ``None`` if no trailers
  206. were sent.
  207. """
  208. # Keep reading until the stream is done.
  209. while not self.remote_closed:
  210. self._recv_cb(stream_id=self.stream_id)
  211. return self.response_trailers
  212. def get_pushes(self, capture_all=False):
  213. """
  214. Returns a generator that yields push promises from the server. Note
  215. that this method is not idempotent; promises returned in one call will
  216. not be returned in subsequent calls. Iterating through generators
  217. returned by multiple calls to this method simultaneously results in
  218. undefined behavior.
  219. :param capture_all: If ``False``, the generator will yield all buffered
  220. push promises without blocking. If ``True``, the generator will
  221. first yield all buffered push promises, then yield additional ones
  222. as they arrive, and terminate when the original stream closes.
  223. """
  224. while True:
  225. for pair in self.promised_headers.items():
  226. yield pair
  227. self.promised_headers = {}
  228. if not capture_all or self.remote_closed:
  229. break
  230. self._recv_cb(stream_id=self.stream_id)
  231. def close(self, error_code=None):
  232. """
  233. Closes the stream. If the stream is currently open, attempts to close
  234. it as gracefully as possible.
  235. :param error_code: (optional) The error code to reset the stream with.
  236. :returns: Nothing.
  237. """
  238. # FIXME: I think this is overbroad, but for now it's probably ok.
  239. if not (self.remote_closed and self.local_closed):
  240. try:
  241. with self._conn as conn:
  242. conn.reset_stream(self.stream_id, error_code or 0)
  243. except h2.exceptions.ProtocolError:
  244. # If for any reason we can't reset the stream, just
  245. # tolerate it.
  246. pass
  247. else:
  248. self._send_outstanding_data(tolerate_peer_gone=True)
  249. self.remote_closed = True
  250. self.local_closed = True
  251. self._close_cb(self.stream_id)
  252. @property
  253. def _out_flow_control_window(self):
  254. """
  255. The size of our outbound flow control window.
  256. """
  257. with self._conn as conn:
  258. return conn.local_flow_control_window(self.stream_id)
  259. def _send_chunk(self, data, final):
  260. """
  261. Implements most of the sending logic.
  262. Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and
  263. sends it. Optionally sets the END_STREAM flag if this is the last chunk
  264. (determined by being of size less than MAX_CHUNK) and no more data is
  265. to be sent.
  266. """
  267. # If we don't fit in the connection window, try popping frames off the
  268. # connection in hope that one might be a window update frame.
  269. while len(data) > self._out_flow_control_window:
  270. self._recv_cb()
  271. # If the length of the data is less than MAX_CHUNK, we're probably
  272. # at the end of the file. If this is the end of the data, mark it
  273. # as END_STREAM.
  274. end_stream = False
  275. if len(data) < MAX_CHUNK and final:
  276. end_stream = True
  277. # Send the frame and decrement the flow control window.
  278. with self._conn as conn:
  279. conn.send_data(
  280. stream_id=self.stream_id, data=data, end_stream=end_stream
  281. )
  282. self._send_outstanding_data()
  283. if end_stream:
  284. self.local_closed = True