123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751 |
- #
- # Copyright 2014 Facebook
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- """Client and server implementations of HTTP/1.x.
- .. versionadded:: 4.0
- """
- from __future__ import absolute_import, division, print_function
- import re
- import warnings
- from tornado.concurrent import (Future, future_add_done_callback,
- future_set_result_unless_cancelled)
- from tornado.escape import native_str, utf8
- from tornado import gen
- from tornado import httputil
- from tornado import iostream
- from tornado.log import gen_log, app_log
- from tornado import stack_context
- from tornado.util import GzipDecompressor, PY3
- class _QuietException(Exception):
- def __init__(self):
- pass
- class _ExceptionLoggingContext(object):
- """Used with the ``with`` statement when calling delegate methods to
- log any exceptions with the given logger. Any exceptions caught are
- converted to _QuietException
- """
- def __init__(self, logger):
- self.logger = logger
- def __enter__(self):
- pass
- def __exit__(self, typ, value, tb):
- if value is not None:
- self.logger.error("Uncaught exception", exc_info=(typ, value, tb))
- raise _QuietException
- class HTTP1ConnectionParameters(object):
- """Parameters for `.HTTP1Connection` and `.HTTP1ServerConnection`.
- """
- def __init__(self, no_keep_alive=False, chunk_size=None,
- max_header_size=None, header_timeout=None, max_body_size=None,
- body_timeout=None, decompress=False):
- """
- :arg bool no_keep_alive: If true, always close the connection after
- one request.
- :arg int chunk_size: how much data to read into memory at once
- :arg int max_header_size: maximum amount of data for HTTP headers
- :arg float header_timeout: how long to wait for all headers (seconds)
- :arg int max_body_size: maximum amount of data for body
- :arg float body_timeout: how long to wait while reading body (seconds)
- :arg bool decompress: if true, decode incoming
- ``Content-Encoding: gzip``
- """
- self.no_keep_alive = no_keep_alive
- self.chunk_size = chunk_size or 65536
- self.max_header_size = max_header_size or 65536
- self.header_timeout = header_timeout
- self.max_body_size = max_body_size
- self.body_timeout = body_timeout
- self.decompress = decompress
- class HTTP1Connection(httputil.HTTPConnection):
- """Implements the HTTP/1.x protocol.
- This class can be on its own for clients, or via `HTTP1ServerConnection`
- for servers.
- """
- def __init__(self, stream, is_client, params=None, context=None):
- """
- :arg stream: an `.IOStream`
- :arg bool is_client: client or server
- :arg params: a `.HTTP1ConnectionParameters` instance or ``None``
- :arg context: an opaque application-defined object that can be accessed
- as ``connection.context``.
- """
- self.is_client = is_client
- self.stream = stream
- if params is None:
- params = HTTP1ConnectionParameters()
- self.params = params
- self.context = context
- self.no_keep_alive = params.no_keep_alive
- # The body limits can be altered by the delegate, so save them
- # here instead of just referencing self.params later.
- self._max_body_size = (self.params.max_body_size or
- self.stream.max_buffer_size)
- self._body_timeout = self.params.body_timeout
- # _write_finished is set to True when finish() has been called,
- # i.e. there will be no more data sent. Data may still be in the
- # stream's write buffer.
- self._write_finished = False
- # True when we have read the entire incoming body.
- self._read_finished = False
- # _finish_future resolves when all data has been written and flushed
- # to the IOStream.
- self._finish_future = Future()
- # If true, the connection should be closed after this request
- # (after the response has been written in the server side,
- # and after it has been read in the client)
- self._disconnect_on_finish = False
- self._clear_callbacks()
- # Save the start lines after we read or write them; they
- # affect later processing (e.g. 304 responses and HEAD methods
- # have content-length but no bodies)
- self._request_start_line = None
- self._response_start_line = None
- self._request_headers = None
- # True if we are writing output with chunked encoding.
- self._chunking_output = None
- # While reading a body with a content-length, this is the
- # amount left to read.
- self._expected_content_remaining = None
- # A Future for our outgoing writes, returned by IOStream.write.
- self._pending_write = None
- def read_response(self, delegate):
- """Read a single HTTP response.
- Typical client-mode usage is to write a request using `write_headers`,
- `write`, and `finish`, and then call ``read_response``.
- :arg delegate: a `.HTTPMessageDelegate`
- Returns a `.Future` that resolves to None after the full response has
- been read.
- """
- if self.params.decompress:
- delegate = _GzipMessageDelegate(delegate, self.params.chunk_size)
- return self._read_message(delegate)
- @gen.coroutine
- def _read_message(self, delegate):
- need_delegate_close = False
- try:
- header_future = self.stream.read_until_regex(
- b"\r?\n\r?\n",
- max_bytes=self.params.max_header_size)
- if self.params.header_timeout is None:
- header_data = yield header_future
- else:
- try:
- header_data = yield gen.with_timeout(
- self.stream.io_loop.time() + self.params.header_timeout,
- header_future,
- quiet_exceptions=iostream.StreamClosedError)
- except gen.TimeoutError:
- self.close()
- raise gen.Return(False)
- start_line, headers = self._parse_headers(header_data)
- if self.is_client:
- start_line = httputil.parse_response_start_line(start_line)
- self._response_start_line = start_line
- else:
- start_line = httputil.parse_request_start_line(start_line)
- self._request_start_line = start_line
- self._request_headers = headers
- self._disconnect_on_finish = not self._can_keep_alive(
- start_line, headers)
- need_delegate_close = True
- with _ExceptionLoggingContext(app_log):
- header_future = delegate.headers_received(start_line, headers)
- if header_future is not None:
- yield header_future
- if self.stream is None:
- # We've been detached.
- need_delegate_close = False
- raise gen.Return(False)
- skip_body = False
- if self.is_client:
- if (self._request_start_line is not None and
- self._request_start_line.method == 'HEAD'):
- skip_body = True
- code = start_line.code
- if code == 304:
- # 304 responses may include the content-length header
- # but do not actually have a body.
- # http://tools.ietf.org/html/rfc7230#section-3.3
- skip_body = True
- if code >= 100 and code < 200:
- # 1xx responses should never indicate the presence of
- # a body.
- if ('Content-Length' in headers or
- 'Transfer-Encoding' in headers):
- raise httputil.HTTPInputError(
- "Response code %d cannot have body" % code)
- # TODO: client delegates will get headers_received twice
- # in the case of a 100-continue. Document or change?
- yield self._read_message(delegate)
- else:
- if (headers.get("Expect") == "100-continue" and
- not self._write_finished):
- self.stream.write(b"HTTP/1.1 100 (Continue)\r\n\r\n")
- if not skip_body:
- body_future = self._read_body(
- start_line.code if self.is_client else 0, headers, delegate)
- if body_future is not None:
- if self._body_timeout is None:
- yield body_future
- else:
- try:
- yield gen.with_timeout(
- self.stream.io_loop.time() + self._body_timeout,
- body_future,
- quiet_exceptions=iostream.StreamClosedError)
- except gen.TimeoutError:
- gen_log.info("Timeout reading body from %s",
- self.context)
- self.stream.close()
- raise gen.Return(False)
- self._read_finished = True
- if not self._write_finished or self.is_client:
- need_delegate_close = False
- with _ExceptionLoggingContext(app_log):
- delegate.finish()
- # If we're waiting for the application to produce an asynchronous
- # response, and we're not detached, register a close callback
- # on the stream (we didn't need one while we were reading)
- if (not self._finish_future.done() and
- self.stream is not None and
- not self.stream.closed()):
- self.stream.set_close_callback(self._on_connection_close)
- yield self._finish_future
- if self.is_client and self._disconnect_on_finish:
- self.close()
- if self.stream is None:
- raise gen.Return(False)
- except httputil.HTTPInputError as e:
- gen_log.info("Malformed HTTP message from %s: %s",
- self.context, e)
- if not self.is_client:
- yield self.stream.write(b'HTTP/1.1 400 Bad Request\r\n\r\n')
- self.close()
- raise gen.Return(False)
- finally:
- if need_delegate_close:
- with _ExceptionLoggingContext(app_log):
- delegate.on_connection_close()
- header_future = None
- self._clear_callbacks()
- raise gen.Return(True)
- def _clear_callbacks(self):
- """Clears the callback attributes.
- This allows the request handler to be garbage collected more
- quickly in CPython by breaking up reference cycles.
- """
- self._write_callback = None
- self._write_future = None
- self._close_callback = None
- if self.stream is not None:
- self.stream.set_close_callback(None)
- def set_close_callback(self, callback):
- """Sets a callback that will be run when the connection is closed.
- Note that this callback is slightly different from
- `.HTTPMessageDelegate.on_connection_close`: The
- `.HTTPMessageDelegate` method is called when the connection is
- closed while recieving a message. This callback is used when
- there is not an active delegate (for example, on the server
- side this callback is used if the client closes the connection
- after sending its request but before receiving all the
- response.
- """
- self._close_callback = stack_context.wrap(callback)
- def _on_connection_close(self):
- # Note that this callback is only registered on the IOStream
- # when we have finished reading the request and are waiting for
- # the application to produce its response.
- if self._close_callback is not None:
- callback = self._close_callback
- self._close_callback = None
- callback()
- if not self._finish_future.done():
- future_set_result_unless_cancelled(self._finish_future, None)
- self._clear_callbacks()
- def close(self):
- if self.stream is not None:
- self.stream.close()
- self._clear_callbacks()
- if not self._finish_future.done():
- future_set_result_unless_cancelled(self._finish_future, None)
- def detach(self):
- """Take control of the underlying stream.
- Returns the underlying `.IOStream` object and stops all further
- HTTP processing. May only be called during
- `.HTTPMessageDelegate.headers_received`. Intended for implementing
- protocols like websockets that tunnel over an HTTP handshake.
- """
- self._clear_callbacks()
- stream = self.stream
- self.stream = None
- if not self._finish_future.done():
- future_set_result_unless_cancelled(self._finish_future, None)
- return stream
- def set_body_timeout(self, timeout):
- """Sets the body timeout for a single request.
- Overrides the value from `.HTTP1ConnectionParameters`.
- """
- self._body_timeout = timeout
- def set_max_body_size(self, max_body_size):
- """Sets the body size limit for a single request.
- Overrides the value from `.HTTP1ConnectionParameters`.
- """
- self._max_body_size = max_body_size
- def write_headers(self, start_line, headers, chunk=None, callback=None):
- """Implements `.HTTPConnection.write_headers`."""
- lines = []
- if self.is_client:
- self._request_start_line = start_line
- lines.append(utf8('%s %s HTTP/1.1' % (start_line[0], start_line[1])))
- # Client requests with a non-empty body must have either a
- # Content-Length or a Transfer-Encoding.
- self._chunking_output = (
- start_line.method in ('POST', 'PUT', 'PATCH') and
- 'Content-Length' not in headers and
- 'Transfer-Encoding' not in headers)
- else:
- self._response_start_line = start_line
- lines.append(utf8('HTTP/1.1 %d %s' % (start_line[1], start_line[2])))
- self._chunking_output = (
- # TODO: should this use
- # self._request_start_line.version or
- # start_line.version?
- self._request_start_line.version == 'HTTP/1.1' and
- # 1xx, 204 and 304 responses have no body (not even a zero-length
- # body), and so should not have either Content-Length or
- # Transfer-Encoding headers.
- start_line.code not in (204, 304) and
- (start_line.code < 100 or start_line.code >= 200) and
- # No need to chunk the output if a Content-Length is specified.
- 'Content-Length' not in headers and
- # Applications are discouraged from touching Transfer-Encoding,
- # but if they do, leave it alone.
- 'Transfer-Encoding' not in headers)
- # If connection to a 1.1 client will be closed, inform client
- if (self._request_start_line.version == 'HTTP/1.1' and self._disconnect_on_finish):
- headers['Connection'] = 'close'
- # If a 1.0 client asked for keep-alive, add the header.
- if (self._request_start_line.version == 'HTTP/1.0' and
- self._request_headers.get('Connection', '').lower() == 'keep-alive'):
- headers['Connection'] = 'Keep-Alive'
- if self._chunking_output:
- headers['Transfer-Encoding'] = 'chunked'
- if (not self.is_client and
- (self._request_start_line.method == 'HEAD' or
- start_line.code == 304)):
- self._expected_content_remaining = 0
- elif 'Content-Length' in headers:
- self._expected_content_remaining = int(headers['Content-Length'])
- else:
- self._expected_content_remaining = None
- # TODO: headers are supposed to be of type str, but we still have some
- # cases that let bytes slip through. Remove these native_str calls when those
- # are fixed.
- header_lines = (native_str(n) + ": " + native_str(v) for n, v in headers.get_all())
- if PY3:
- lines.extend(l.encode('latin1') for l in header_lines)
- else:
- lines.extend(header_lines)
- for line in lines:
- if b'\n' in line:
- raise ValueError('Newline in header: ' + repr(line))
- future = None
- if self.stream.closed():
- future = self._write_future = Future()
- future.set_exception(iostream.StreamClosedError())
- future.exception()
- else:
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._write_callback = stack_context.wrap(callback)
- else:
- future = self._write_future = Future()
- data = b"\r\n".join(lines) + b"\r\n\r\n"
- if chunk:
- data += self._format_chunk(chunk)
- self._pending_write = self.stream.write(data)
- future_add_done_callback(self._pending_write, self._on_write_complete)
- return future
- def _format_chunk(self, chunk):
- if self._expected_content_remaining is not None:
- self._expected_content_remaining -= len(chunk)
- if self._expected_content_remaining < 0:
- # Close the stream now to stop further framing errors.
- self.stream.close()
- raise httputil.HTTPOutputError(
- "Tried to write more data than Content-Length")
- if self._chunking_output and chunk:
- # Don't write out empty chunks because that means END-OF-STREAM
- # with chunked encoding
- return utf8("%x" % len(chunk)) + b"\r\n" + chunk + b"\r\n"
- else:
- return chunk
- def write(self, chunk, callback=None):
- """Implements `.HTTPConnection.write`.
- For backwards compatibility it is allowed but deprecated to
- skip `write_headers` and instead call `write()` with a
- pre-encoded header block.
- """
- future = None
- if self.stream.closed():
- future = self._write_future = Future()
- self._write_future.set_exception(iostream.StreamClosedError())
- self._write_future.exception()
- else:
- if callback is not None:
- warnings.warn("callback argument is deprecated, use returned Future instead",
- DeprecationWarning)
- self._write_callback = stack_context.wrap(callback)
- else:
- future = self._write_future = Future()
- self._pending_write = self.stream.write(self._format_chunk(chunk))
- self._pending_write.add_done_callback(self._on_write_complete)
- return future
- def finish(self):
- """Implements `.HTTPConnection.finish`."""
- if (self._expected_content_remaining is not None and
- self._expected_content_remaining != 0 and
- not self.stream.closed()):
- self.stream.close()
- raise httputil.HTTPOutputError(
- "Tried to write %d bytes less than Content-Length" %
- self._expected_content_remaining)
- if self._chunking_output:
- if not self.stream.closed():
- self._pending_write = self.stream.write(b"0\r\n\r\n")
- self._pending_write.add_done_callback(self._on_write_complete)
- self._write_finished = True
- # If the app finished the request while we're still reading,
- # divert any remaining data away from the delegate and
- # close the connection when we're done sending our response.
- # Closing the connection is the only way to avoid reading the
- # whole input body.
- if not self._read_finished:
- self._disconnect_on_finish = True
- # No more data is coming, so instruct TCP to send any remaining
- # data immediately instead of waiting for a full packet or ack.
- self.stream.set_nodelay(True)
- if self._pending_write is None:
- self._finish_request(None)
- else:
- future_add_done_callback(self._pending_write, self._finish_request)
- def _on_write_complete(self, future):
- exc = future.exception()
- if exc is not None and not isinstance(exc, iostream.StreamClosedError):
- future.result()
- if self._write_callback is not None:
- callback = self._write_callback
- self._write_callback = None
- self.stream.io_loop.add_callback(callback)
- if self._write_future is not None:
- future = self._write_future
- self._write_future = None
- future_set_result_unless_cancelled(future, None)
- def _can_keep_alive(self, start_line, headers):
- if self.params.no_keep_alive:
- return False
- connection_header = headers.get("Connection")
- if connection_header is not None:
- connection_header = connection_header.lower()
- if start_line.version == "HTTP/1.1":
- return connection_header != "close"
- elif ("Content-Length" in headers or
- headers.get("Transfer-Encoding", "").lower() == "chunked" or
- getattr(start_line, 'method', None) in ("HEAD", "GET")):
- # start_line may be a request or response start line; only
- # the former has a method attribute.
- return connection_header == "keep-alive"
- return False
- def _finish_request(self, future):
- self._clear_callbacks()
- if not self.is_client and self._disconnect_on_finish:
- self.close()
- return
- # Turn Nagle's algorithm back on, leaving the stream in its
- # default state for the next request.
- self.stream.set_nodelay(False)
- if not self._finish_future.done():
- future_set_result_unless_cancelled(self._finish_future, None)
- def _parse_headers(self, data):
- # The lstrip removes newlines that some implementations sometimes
- # insert between messages of a reused connection. Per RFC 7230,
- # we SHOULD ignore at least one empty line before the request.
- # http://tools.ietf.org/html/rfc7230#section-3.5
- data = native_str(data.decode('latin1')).lstrip("\r\n")
- # RFC 7230 section allows for both CRLF and bare LF.
- eol = data.find("\n")
- start_line = data[:eol].rstrip("\r")
- headers = httputil.HTTPHeaders.parse(data[eol:])
- return start_line, headers
- def _read_body(self, code, headers, delegate):
- if "Content-Length" in headers:
- if "Transfer-Encoding" in headers:
- # Response cannot contain both Content-Length and
- # Transfer-Encoding headers.
- # http://tools.ietf.org/html/rfc7230#section-3.3.3
- raise httputil.HTTPInputError(
- "Response with both Transfer-Encoding and Content-Length")
- if "," in headers["Content-Length"]:
- # Proxies sometimes cause Content-Length headers to get
- # duplicated. If all the values are identical then we can
- # use them but if they differ it's an error.
- pieces = re.split(r',\s*', headers["Content-Length"])
- if any(i != pieces[0] for i in pieces):
- raise httputil.HTTPInputError(
- "Multiple unequal Content-Lengths: %r" %
- headers["Content-Length"])
- headers["Content-Length"] = pieces[0]
- try:
- content_length = int(headers["Content-Length"])
- except ValueError:
- # Handles non-integer Content-Length value.
- raise httputil.HTTPInputError(
- "Only integer Content-Length is allowed: %s" % headers["Content-Length"])
- if content_length > self._max_body_size:
- raise httputil.HTTPInputError("Content-Length too long")
- else:
- content_length = None
- if code == 204:
- # This response code is not allowed to have a non-empty body,
- # and has an implicit length of zero instead of read-until-close.
- # http://www.w3.org/Protocols/rfc2616/rfc2616-sec4.html#sec4.3
- if ("Transfer-Encoding" in headers or
- content_length not in (None, 0)):
- raise httputil.HTTPInputError(
- "Response with code %d should not have body" % code)
- content_length = 0
- if content_length is not None:
- return self._read_fixed_body(content_length, delegate)
- if headers.get("Transfer-Encoding", "").lower() == "chunked":
- return self._read_chunked_body(delegate)
- if self.is_client:
- return self._read_body_until_close(delegate)
- return None
- @gen.coroutine
- def _read_fixed_body(self, content_length, delegate):
- while content_length > 0:
- body = yield self.stream.read_bytes(
- min(self.params.chunk_size, content_length), partial=True)
- content_length -= len(body)
- if not self._write_finished or self.is_client:
- with _ExceptionLoggingContext(app_log):
- ret = delegate.data_received(body)
- if ret is not None:
- yield ret
- @gen.coroutine
- def _read_chunked_body(self, delegate):
- # TODO: "chunk extensions" http://tools.ietf.org/html/rfc2616#section-3.6.1
- total_size = 0
- while True:
- chunk_len = yield self.stream.read_until(b"\r\n", max_bytes=64)
- chunk_len = int(chunk_len.strip(), 16)
- if chunk_len == 0:
- crlf = yield self.stream.read_bytes(2)
- if crlf != b'\r\n':
- raise httputil.HTTPInputError("improperly terminated chunked request")
- return
- total_size += chunk_len
- if total_size > self._max_body_size:
- raise httputil.HTTPInputError("chunked body too large")
- bytes_to_read = chunk_len
- while bytes_to_read:
- chunk = yield self.stream.read_bytes(
- min(bytes_to_read, self.params.chunk_size), partial=True)
- bytes_to_read -= len(chunk)
- if not self._write_finished or self.is_client:
- with _ExceptionLoggingContext(app_log):
- ret = delegate.data_received(chunk)
- if ret is not None:
- yield ret
- # chunk ends with \r\n
- crlf = yield self.stream.read_bytes(2)
- assert crlf == b"\r\n"
- @gen.coroutine
- def _read_body_until_close(self, delegate):
- body = yield self.stream.read_until_close()
- if not self._write_finished or self.is_client:
- with _ExceptionLoggingContext(app_log):
- delegate.data_received(body)
- class _GzipMessageDelegate(httputil.HTTPMessageDelegate):
- """Wraps an `HTTPMessageDelegate` to decode ``Content-Encoding: gzip``.
- """
- def __init__(self, delegate, chunk_size):
- self._delegate = delegate
- self._chunk_size = chunk_size
- self._decompressor = None
- def headers_received(self, start_line, headers):
- if headers.get("Content-Encoding") == "gzip":
- self._decompressor = GzipDecompressor()
- # Downstream delegates will only see uncompressed data,
- # so rename the content-encoding header.
- # (but note that curl_httpclient doesn't do this).
- headers.add("X-Consumed-Content-Encoding",
- headers["Content-Encoding"])
- del headers["Content-Encoding"]
- return self._delegate.headers_received(start_line, headers)
- @gen.coroutine
- def data_received(self, chunk):
- if self._decompressor:
- compressed_data = chunk
- while compressed_data:
- decompressed = self._decompressor.decompress(
- compressed_data, self._chunk_size)
- if decompressed:
- ret = self._delegate.data_received(decompressed)
- if ret is not None:
- yield ret
- compressed_data = self._decompressor.unconsumed_tail
- else:
- ret = self._delegate.data_received(chunk)
- if ret is not None:
- yield ret
- def finish(self):
- if self._decompressor is not None:
- tail = self._decompressor.flush()
- if tail:
- # I believe the tail will always be empty (i.e.
- # decompress will return all it can). The purpose
- # of the flush call is to detect errors such
- # as truncated input. But in case it ever returns
- # anything, treat it as an extra chunk
- self._delegate.data_received(tail)
- return self._delegate.finish()
- def on_connection_close(self):
- return self._delegate.on_connection_close()
- class HTTP1ServerConnection(object):
- """An HTTP/1.x server."""
- def __init__(self, stream, params=None, context=None):
- """
- :arg stream: an `.IOStream`
- :arg params: a `.HTTP1ConnectionParameters` or None
- :arg context: an opaque application-defined object that is accessible
- as ``connection.context``
- """
- self.stream = stream
- if params is None:
- params = HTTP1ConnectionParameters()
- self.params = params
- self.context = context
- self._serving_future = None
- @gen.coroutine
- def close(self):
- """Closes the connection.
- Returns a `.Future` that resolves after the serving loop has exited.
- """
- self.stream.close()
- # Block until the serving loop is done, but ignore any exceptions
- # (start_serving is already responsible for logging them).
- try:
- yield self._serving_future
- except Exception:
- pass
- def start_serving(self, delegate):
- """Starts serving requests on this connection.
- :arg delegate: a `.HTTPServerConnectionDelegate`
- """
- assert isinstance(delegate, httputil.HTTPServerConnectionDelegate)
- self._serving_future = self._server_request_loop(delegate)
- # Register the future on the IOLoop so its errors get logged.
- self.stream.io_loop.add_future(self._serving_future,
- lambda f: f.result())
- @gen.coroutine
- def _server_request_loop(self, delegate):
- try:
- while True:
- conn = HTTP1Connection(self.stream, False,
- self.params, self.context)
- request_delegate = delegate.start_request(self, conn)
- try:
- ret = yield conn.read_response(request_delegate)
- except (iostream.StreamClosedError,
- iostream.UnsatisfiableReadError):
- return
- except _QuietException:
- # This exception was already logged.
- conn.close()
- return
- except Exception:
- gen_log.error("Uncaught exception", exc_info=True)
- conn.close()
- return
- if not ret:
- return
- yield gen.moment
- finally:
- delegate.on_close(self)
|