123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341 |
- # -*- coding: utf-8 -*-
- """
- hyper/http20/stream
- ~~~~~~~~~~~~~~~~~~~
- Objects that make up the stream-level abstraction of hyper's HTTP/2 support.
- These objects are not expected to be part of the public HTTP/2 API: they're
- intended purely for use inside hyper's HTTP/2 abstraction.
- Conceptually, a single HTTP/2 connection is made up of many streams: each
- stream is an independent, bi-directional sequence of HTTP headers and data.
- Each stream is identified by a monotonically increasing integer, assigned to
- the stream by the endpoint that initiated the stream.
- """
- import h2.exceptions
- from ..common.headers import HTTPHeaderMap
- from .util import h2_safe_headers
- import logging
- log = logging.getLogger(__name__)
- # Define the largest chunk of data we'll send in one go. Realistically, we
- # should take the MSS into account but that's pretty dull, so let's just say
- # 1kB and call it a day.
- MAX_CHUNK = 1024
- class Stream(object):
- """
- A single HTTP/2 stream.
- A stream is an independent, bi-directional sequence of HTTP headers and
- data. Each stream is identified by a single integer. From a HTTP
- perspective, a stream _approximately_ matches a single request-response
- pair.
- """
- def __init__(self,
- stream_id,
- window_manager,
- connection,
- send_outstanding_data,
- recv_cb,
- close_cb):
- self.stream_id = stream_id
- self.headers = HTTPHeaderMap()
- # Set to a key-value set of the response headers once their
- # HEADERS..CONTINUATION frame sequence finishes.
- self.response_headers = None
- # Set to a key-value set of the response trailers once their
- # HEADERS..CONTINUATION frame sequence finishes.
- self.response_trailers = None
- # A dict mapping the promised stream ID of a pushed resource to a
- # key-value set of its request headers. Entries are added once their
- # PUSH_PROMISE..CONTINUATION frame sequence finishes.
- self.promised_headers = {}
- # Unconsumed response data chunks. Empties after every call to _read().
- self.data = []
- # Whether the remote side has completed the stream.
- self.remote_closed = False
- # Whether we have closed the stream.
- self.local_closed = False
- # There are two flow control windows: one for data we're sending,
- # one for data being sent to us.
- self._in_window_manager = window_manager
- # Save off a reference to the state machine wrapped with lock.
- self._conn = connection
- # Save off a data callback.
- self._send_outstanding_data = send_outstanding_data
- self._recv_cb = recv_cb
- self._close_cb = close_cb
- def add_header(self, name, value, replace=False):
- """
- Adds a single HTTP header to the headers to be sent on the request.
- """
- if not replace:
- self.headers[name] = value
- else:
- self.headers.replace(name, value)
- def send_headers(self, end_stream=False):
- """
- Sends the complete saved header block on the stream.
- """
- headers = self.get_headers()
- with self._conn as conn:
- conn.send_headers(self.stream_id, headers, end_stream)
- self._send_outstanding_data()
- if end_stream:
- self.local_closed = True
- def send_data(self, data, final):
- """
- Send some data on the stream. If this is the end of the data to be
- sent, the ``final`` flag _must_ be set to True. If no data is to be
- sent, set ``data`` to ``None``.
- """
- # Define a utility iterator for file objects.
- def file_iterator(fobj):
- while True:
- data = fobj.read(MAX_CHUNK)
- yield data
- if len(data) < MAX_CHUNK:
- break
- # Build the appropriate iterator for the data, in chunks of CHUNK_SIZE.
- if hasattr(data, 'read'):
- chunks = file_iterator(data)
- else:
- chunks = (data[i:i+MAX_CHUNK]
- for i in range(0, len(data), MAX_CHUNK))
- for chunk in chunks:
- self._send_chunk(chunk, final)
- def _read(self, amt=None):
- """
- Read data from the stream. Unlike a normal read behaviour, this
- function returns _at least_ ``amt`` data, but may return more.
- """
- def listlen(list):
- return sum(map(len, list))
- # Keep reading until the stream is closed or we get enough data.
- while (not self.remote_closed and
- (amt is None or listlen(self.data) < amt)):
- self._recv_cb(stream_id=self.stream_id)
- result = b''.join(self.data)
- self.data = []
- return result
- def _read_one_frame(self):
- """
- Reads a single data frame from the stream and returns it.
- """
- # Keep reading until the stream is closed or we have a data frame.
- while not self.remote_closed and not self.data:
- self._recv_cb(stream_id=self.stream_id)
- try:
- return self.data.pop(0)
- except IndexError:
- return None
- def receive_response(self, event):
- """
- Receive response headers.
- """
- # TODO: If this is called while we're still sending data, we may want
- # to stop sending that data and check the response. Early responses to
- # big uploads are almost always a problem.
- self.response_headers = HTTPHeaderMap(event.headers)
- def receive_trailers(self, event):
- """
- Receive response trailers.
- """
- self.response_trailers = HTTPHeaderMap(event.headers)
- def receive_push(self, event):
- """
- Receive the request headers for a pushed stream.
- """
- self.promised_headers[event.pushed_stream_id] = event.headers
- def receive_data(self, event):
- """
- Receive a chunk of data.
- """
- size = event.flow_controlled_length
- increment = self._in_window_manager._handle_frame(size)
- # Append the data to the buffer.
- self.data.append(event.data)
- if increment:
- try:
- with self._conn as conn:
- conn.increment_flow_control_window(
- increment, stream_id=self.stream_id
- )
- except h2.exceptions.StreamClosedError:
- # We haven't got to it yet, but the stream is already
- # closed. We don't need to increment the window in this
- # case!
- pass
- else:
- self._send_outstanding_data()
- def receive_end_stream(self, event):
- """
- All of the data is returned now.
- """
- self.remote_closed = True
- def receive_reset(self, event):
- """
- Stream forcefully reset.
- """
- self.remote_closed = True
- self._close_cb(self.stream_id)
- def get_headers(self):
- """
- Provides the headers to the connection object.
- """
- # Strip any headers invalid in H2.
- return h2_safe_headers(self.headers)
- def getheaders(self):
- """
- Once all data has been sent on this connection, returns a key-value set
- of the headers of the response to the original request.
- """
- # Keep reading until all headers are received.
- while self.response_headers is None:
- self._recv_cb(stream_id=self.stream_id)
- # Find the Content-Length header if present.
- self._in_window_manager.document_size = (
- int(self.response_headers.get(b'content-length', [0])[0])
- )
- return self.response_headers
- def gettrailers(self):
- """
- Once all data has been sent on this connection, returns a key-value set
- of the trailers of the response to the original request.
- .. warning:: Note that this method requires that the stream is
- totally exhausted. This means that, if you have not
- completely read from the stream, all stream data will be
- read into memory.
- :returns: The key-value set of the trailers, or ``None`` if no trailers
- were sent.
- """
- # Keep reading until the stream is done.
- while not self.remote_closed:
- self._recv_cb(stream_id=self.stream_id)
- return self.response_trailers
- def get_pushes(self, capture_all=False):
- """
- Returns a generator that yields push promises from the server. Note
- that this method is not idempotent; promises returned in one call will
- not be returned in subsequent calls. Iterating through generators
- returned by multiple calls to this method simultaneously results in
- undefined behavior.
- :param capture_all: If ``False``, the generator will yield all buffered
- push promises without blocking. If ``True``, the generator will
- first yield all buffered push promises, then yield additional ones
- as they arrive, and terminate when the original stream closes.
- """
- while True:
- for pair in self.promised_headers.items():
- yield pair
- self.promised_headers = {}
- if not capture_all or self.remote_closed:
- break
- self._recv_cb(stream_id=self.stream_id)
- def close(self, error_code=None):
- """
- Closes the stream. If the stream is currently open, attempts to close
- it as gracefully as possible.
- :param error_code: (optional) The error code to reset the stream with.
- :returns: Nothing.
- """
- # FIXME: I think this is overbroad, but for now it's probably ok.
- if not (self.remote_closed and self.local_closed):
- try:
- with self._conn as conn:
- conn.reset_stream(self.stream_id, error_code or 0)
- except h2.exceptions.ProtocolError:
- # If for any reason we can't reset the stream, just
- # tolerate it.
- pass
- else:
- self._send_outstanding_data(tolerate_peer_gone=True)
- self.remote_closed = True
- self.local_closed = True
- self._close_cb(self.stream_id)
- @property
- def _out_flow_control_window(self):
- """
- The size of our outbound flow control window.
- """
- with self._conn as conn:
- return conn.local_flow_control_window(self.stream_id)
- def _send_chunk(self, data, final):
- """
- Implements most of the sending logic.
- Takes a single chunk of size at most MAX_CHUNK, wraps it in a frame and
- sends it. Optionally sets the END_STREAM flag if this is the last chunk
- (determined by being of size less than MAX_CHUNK) and no more data is
- to be sent.
- """
- # If we don't fit in the connection window, try popping frames off the
- # connection in hope that one might be a window update frame.
- while len(data) > self._out_flow_control_window:
- self._recv_cb()
- # If the length of the data is less than MAX_CHUNK, we're probably
- # at the end of the file. If this is the end of the data, mark it
- # as END_STREAM.
- end_stream = False
- if len(data) < MAX_CHUNK and final:
- end_stream = True
- # Send the frame and decrement the flow control window.
- with self._conn as conn:
- conn.send_data(
- stream_id=self.stream_id, data=data, end_stream=end_stream
- )
- self._send_outstanding_data()
- if end_stream:
- self.local_closed = True
|