response.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # -*- coding: utf-8 -*-
  2. """
  3. hyper/http11/response
  4. ~~~~~~~~~~~~~~~~~~~~~
  5. Contains the HTTP/1.1 equivalent of the HTTPResponse object defined in
  6. httplib/http.client.
  7. """
  8. import logging
  9. import weakref
  10. import zlib
  11. from ..common.decoder import DeflateDecoder
  12. from ..common.exceptions import ChunkedDecodeError, InvalidResponseError
  13. from ..common.exceptions import ConnectionResetError
  14. log = logging.getLogger(__name__)
  15. class HTTP11Response(object):
  16. """
  17. An ``HTTP11Response`` wraps the HTTP/1.1 response from the server. It
  18. provides access to the response headers and the entity body. The response
  19. is an iterable object and can be used in a with statement.
  20. """
  21. def __init__(self, code, reason, headers, sock, connection=None):
  22. #: The reason phrase returned by the server.
  23. self.reason = reason
  24. #: The status code returned by the server.
  25. self.status = code
  26. #: The response headers. These are determined upon creation, assigned
  27. #: once, and never assigned again.
  28. self.headers = headers
  29. #: The response trailers. These are always intially ``None``.
  30. self.trailers = None
  31. # The socket this response is being sent over.
  32. self._sock = sock
  33. # Whether we expect the connection to be closed. If we do, we don't
  34. # bother checking for content-length, we just keep reading until
  35. # we no longer can.
  36. self._expect_close = False
  37. if b'close' in self.headers.get(b'connection', []):
  38. self._expect_close = True
  39. # The expected length of the body.
  40. try:
  41. self._length = int(self.headers[b'content-length'][0])
  42. except KeyError:
  43. self._length = None
  44. # Whether we expect a chunked response.
  45. self._chunked = (
  46. b'chunked' in self.headers.get(b'transfer-encoding', [])
  47. )
  48. # One of the following must be true: we must expect that the connection
  49. # will be closed following the body, or that a content-length was sent,
  50. # or that we're getting a chunked response.
  51. # FIXME: Remove naked assert, replace with something better.
  52. assert self._expect_close or self._length is not None or self._chunked
  53. # This object is used for decompressing gzipped request bodies. Right
  54. # now we only support gzip because that's all the RFC mandates of us.
  55. # Later we'll add support for more encodings.
  56. # This 16 + MAX_WBITS nonsense is to force gzip. See this
  57. # Stack Overflow answer for more:
  58. # http://stackoverflow.com/a/2695466/1401686
  59. if b'gzip' in self.headers.get(b'content-encoding', []):
  60. self._decompressobj = zlib.decompressobj(16 + zlib.MAX_WBITS)
  61. elif b'deflate' in self.headers.get(b'content-encoding', []):
  62. self._decompressobj = DeflateDecoder()
  63. else:
  64. self._decompressobj = None
  65. # This is a reference that allows for the Response class to tell the
  66. # parent connection object to throw away its socket object. This is to
  67. # be used when the connection is genuinely closed, so that the user
  68. # can keep using the Connection object.
  69. # Strictly, we take a weakreference to this so that we don't set up a
  70. # reference cycle.
  71. if connection is not None:
  72. self._parent = weakref.ref(connection)
  73. else:
  74. self._parent = None
  75. self._buffered_data = b''
  76. self._chunker = None
  77. def read(self, amt=None, decode_content=True):
  78. """
  79. Reads the response body, or up to the next ``amt`` bytes.
  80. :param amt: (optional) The amount of data to read. If not provided, all
  81. the data will be read from the response.
  82. :param decode_content: (optional) If ``True``, will transparently
  83. decode the response data.
  84. :returns: The read data. Note that if ``decode_content`` is set to
  85. ``True``, the actual amount of data returned may be different to
  86. the amount requested.
  87. """
  88. # Return early if we've lost our connection.
  89. if self._sock is None:
  90. return b''
  91. if self._chunked:
  92. return self._normal_read_chunked(amt, decode_content)
  93. # If we're asked to do a read without a length, we need to read
  94. # everything. That means either the entire content length, or until the
  95. # socket is closed, depending.
  96. if amt is None:
  97. if self._length is not None:
  98. amt = self._length
  99. elif self._expect_close:
  100. return self._read_expect_closed(decode_content)
  101. else: # pragma: no cover
  102. raise InvalidResponseError(
  103. "Response must either have length or Connection: close"
  104. )
  105. # Otherwise, we've been asked to do a bounded read. We should read no
  106. # more than the remaining length, obviously.
  107. # FIXME: Handle cases without _length
  108. if self._length is not None:
  109. amt = min(amt, self._length)
  110. # Now, issue reads until we read that length. This is to account for
  111. # the fact that it's possible that we'll be asked to read more than
  112. # 65kB in one shot.
  113. to_read = amt
  114. chunks = []
  115. # Ideally I'd like this to read 'while to_read', but I want to be
  116. # defensive against the admittedly unlikely case that the socket
  117. # returns *more* data than I want.
  118. while to_read > 0:
  119. chunk = self._sock.recv(amt).tobytes()
  120. # If we got an empty read, but were expecting more, the remote end
  121. # has hung up. Raise an exception if we were expecting more data,
  122. # but if we were expecting the remote end to close then it's ok.
  123. if not chunk:
  124. if self._length is not None or not self._expect_close:
  125. self.close(socket_close=True)
  126. raise ConnectionResetError("Remote end hung up!")
  127. break
  128. to_read -= len(chunk)
  129. chunks.append(chunk)
  130. data = b''.join(chunks)
  131. if self._length is not None:
  132. self._length -= len(data)
  133. # If we're at the end of the request, we have some cleaning up to do.
  134. # Close the stream, and if necessary flush the buffer. Checking that
  135. # we're at the end is actually obscenely complex: either we've read the
  136. # full content-length or, if we were expecting a closed connection,
  137. # we've had a read shorter than the requested amount. We also have to
  138. # do this before we try to decompress the body.
  139. end_of_request = (self._length == 0 or
  140. (self._expect_close and len(data) < amt))
  141. # We may need to decode the body.
  142. if decode_content and self._decompressobj and data:
  143. data = self._decompressobj.decompress(data)
  144. if decode_content and self._decompressobj and end_of_request:
  145. data += self._decompressobj.flush()
  146. # We're at the end. Close the connection. Explicit check for zero here
  147. # because self._length might be None.
  148. if end_of_request:
  149. self.close(socket_close=self._expect_close)
  150. return data
  151. def read_chunked(self, decode_content=True):
  152. """
  153. Reads chunked transfer encoded bodies. This method returns a generator:
  154. each iteration of which yields one chunk *unless* the chunks are
  155. compressed, in which case it yields whatever the decompressor provides
  156. for each chunk.
  157. .. warning:: This may yield the empty string, without that being the
  158. end of the body!
  159. """
  160. if not self._chunked:
  161. raise ChunkedDecodeError(
  162. "Attempted chunked read of non-chunked body."
  163. )
  164. # Return early if possible.
  165. if self._sock is None:
  166. return
  167. while True:
  168. # Read to the newline to get the chunk length. This is a
  169. # hexadecimal integer.
  170. chunk_length = int(self._sock.readline().tobytes().strip(), 16)
  171. data = b''
  172. # If the chunk length is zero, consume the newline and then we're
  173. # done. If we were decompressing data, return the remaining data.
  174. if not chunk_length:
  175. self._sock.readline()
  176. if decode_content and self._decompressobj:
  177. yield self._decompressobj.flush()
  178. self.close(socket_close=self._expect_close)
  179. break
  180. # Then read that many bytes.
  181. while chunk_length > 0:
  182. chunk = self._sock.recv(chunk_length).tobytes()
  183. data += chunk
  184. chunk_length -= len(chunk)
  185. assert chunk_length == 0
  186. # Now, consume the newline.
  187. self._sock.readline()
  188. # We may need to decode the body.
  189. if decode_content and self._decompressobj and data:
  190. data = self._decompressobj.decompress(data)
  191. yield data
  192. return
  193. def close(self, socket_close=False):
  194. """
  195. Close the response. This causes the Response to lose access to the
  196. backing socket. In some cases, it can also cause the backing connection
  197. to be torn down.
  198. :param socket_close: Whether to close the backing socket.
  199. :returns: Nothing.
  200. """
  201. if socket_close and self._parent is not None:
  202. # The double call is necessary because we need to dereference the
  203. # weakref. If the weakref is no longer valid, that's fine, there's
  204. # no connection object to tell.
  205. parent = self._parent()
  206. if parent is not None:
  207. parent.close()
  208. self._sock = None
  209. def _read_expect_closed(self, decode_content):
  210. """
  211. Implements the logic for an unbounded read on a socket that we expect
  212. to be closed by the remote end.
  213. """
  214. # In this case, just read until we cannot read anymore. Then, close the
  215. # socket, becuase we know we have to.
  216. chunks = []
  217. while True:
  218. try:
  219. chunk = self._sock.recv(65535).tobytes()
  220. if not chunk:
  221. break
  222. except ConnectionResetError:
  223. break
  224. else:
  225. chunks.append(chunk)
  226. self.close(socket_close=True)
  227. # We may need to decompress the data.
  228. data = b''.join(chunks)
  229. if decode_content and self._decompressobj:
  230. data = self._decompressobj.decompress(data)
  231. data += self._decompressobj.flush()
  232. return data
  233. def _normal_read_chunked(self, amt, decode_content):
  234. """
  235. Implements the logic for calling ``read()`` on a chunked response.
  236. """
  237. # If we're doing a full read, read it as chunked and then just join
  238. # the chunks together!
  239. if amt is None:
  240. return self._buffered_data + b''.join(self.read_chunked())
  241. if self._chunker is None:
  242. self._chunker = self.read_chunked()
  243. # Otherwise, we have a certain amount of data we want to read.
  244. current_amount = len(self._buffered_data)
  245. extra_data = [self._buffered_data]
  246. while current_amount < amt:
  247. try:
  248. chunk = next(self._chunker)
  249. except StopIteration:
  250. self.close(socket_close=self._expect_close)
  251. break
  252. current_amount += len(chunk)
  253. extra_data.append(chunk)
  254. data = b''.join(extra_data)
  255. self._buffered_data = data[amt:]
  256. return data[:amt]
  257. # The following methods implement the context manager protocol.
  258. def __enter__(self):
  259. return self
  260. def __exit__(self, *args):
  261. self.close()
  262. return False # Never swallow exceptions.