simple_httpclient.py 24 KB


  1. from __future__ import absolute_import, division, print_function
  2. from tornado.escape import _unicode
  3. from tornado import gen
  4. from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main, _RequestProxy
  5. from tornado import httputil
  6. from tornado.http1connection import HTTP1Connection, HTTP1ConnectionParameters
  7. from tornado.ioloop import IOLoop
  8. from tornado.iostream import StreamClosedError
  9. from tornado.netutil import Resolver, OverrideResolver, _client_ssl_defaults
  10. from tornado.log import gen_log
  11. from tornado import stack_context
  12. from tornado.tcpclient import TCPClient
  13. from tornado.util import PY3
  14. import base64
  15. import collections
  16. import copy
  17. import functools
  18. import re
  19. import socket
  20. import sys
  21. import time
  22. from io import BytesIO
  23. if PY3:
  24. import urllib.parse as urlparse
  25. else:
  26. import urlparse
  27. try:
  28. import ssl
  29. except ImportError:
  30. # ssl is not available on Google App Engine.
  31. ssl = None
  32. class HTTPTimeoutError(HTTPError):
  33. """Error raised by SimpleAsyncHTTPClient on timeout.
  34. For historical reasons, this is a subclass of `.HTTPClientError`
  35. which simulates a response code of 599.
  36. .. versionadded:: 5.1
  37. """
  38. def __init__(self, message):
  39. super(HTTPTimeoutError, self).__init__(599, message=message)
  40. def __str__(self):
  41. return self.message
  42. class HTTPStreamClosedError(HTTPError):
  43. """Error raised by SimpleAsyncHTTPClient when the underlying stream is closed.
  44. When a more specific exception is available (such as `ConnectionResetError`),
  45. it may be raised instead of this one.
  46. For historical reasons, this is a subclass of `.HTTPClientError`
  47. which simulates a response code of 599.
  48. .. versionadded:: 5.1
  49. """
  50. def __init__(self, message):
  51. super(HTTPStreamClosedError, self).__init__(599, message=message)
  52. def __str__(self):
  53. return self.message
  54. class SimpleAsyncHTTPClient(AsyncHTTPClient):
  55. """Non-blocking HTTP client with no external dependencies.
  56. This class implements an HTTP 1.1 client on top of Tornado's IOStreams.
  57. Some features found in the curl-based AsyncHTTPClient are not yet
  58. supported. In particular, proxies are not supported, connections
  59. are not reused, and callers cannot select the network interface to be
  60. used.
  61. """
  62. def initialize(self, max_clients=10,
  63. hostname_mapping=None, max_buffer_size=104857600,
  64. resolver=None, defaults=None, max_header_size=None,
  65. max_body_size=None):
  66. """Creates a AsyncHTTPClient.
  67. Only a single AsyncHTTPClient instance exists per IOLoop
  68. in order to provide limitations on the number of pending connections.
  69. ``force_instance=True`` may be used to suppress this behavior.
  70. Note that because of this implicit reuse, unless ``force_instance``
  71. is used, only the first call to the constructor actually uses
  72. its arguments. It is recommended to use the ``configure`` method
  73. instead of the constructor to ensure that arguments take effect.
  74. ``max_clients`` is the number of concurrent requests that can be
  75. in progress; when this limit is reached additional requests will be
  76. queued. Note that time spent waiting in this queue still counts
  77. against the ``request_timeout``.
  78. ``hostname_mapping`` is a dictionary mapping hostnames to IP addresses.
  79. It can be used to make local DNS changes when modifying system-wide
  80. settings like ``/etc/hosts`` is not possible or desirable (e.g. in
  81. unittests).
  82. ``max_buffer_size`` (default 100MB) is the number of bytes
  83. that can be read into memory at once. ``max_body_size``
  84. (defaults to ``max_buffer_size``) is the largest response body
  85. that the client will accept. Without a
  86. ``streaming_callback``, the smaller of these two limits
  87. applies; with a ``streaming_callback`` only ``max_body_size``
  88. does.
  89. .. versionchanged:: 4.2
  90. Added the ``max_body_size`` argument.
  91. """
  92. super(SimpleAsyncHTTPClient, self).initialize(defaults=defaults)
  93. self.max_clients = max_clients
  94. self.queue = collections.deque()
  95. self.active = {}
  96. self.waiting = {}
  97. self.max_buffer_size = max_buffer_size
  98. self.max_header_size = max_header_size
  99. self.max_body_size = max_body_size
  100. # TCPClient could create a Resolver for us, but we have to do it
  101. # ourselves to support hostname_mapping.
  102. if resolver:
  103. self.resolver = resolver
  104. self.own_resolver = False
  105. else:
  106. self.resolver = Resolver()
  107. self.own_resolver = True
  108. if hostname_mapping is not None:
  109. self.resolver = OverrideResolver(resolver=self.resolver,
  110. mapping=hostname_mapping)
  111. self.tcp_client = TCPClient(resolver=self.resolver)
  112. def close(self):
  113. super(SimpleAsyncHTTPClient, self).close()
  114. if self.own_resolver:
  115. self.resolver.close()
  116. self.tcp_client.close()
  117. def fetch_impl(self, request, callback):
  118. key = object()
  119. self.queue.append((key, request, callback))
  120. if not len(self.active) < self.max_clients:
  121. timeout_handle = self.io_loop.add_timeout(
  122. self.io_loop.time() + min(request.connect_timeout,
  123. request.request_timeout),
  124. functools.partial(self._on_timeout, key, "in request queue"))
  125. else:
  126. timeout_handle = None
  127. self.waiting[key] = (request, callback, timeout_handle)
  128. self._process_queue()
  129. if self.queue:
  130. gen_log.debug("max_clients limit reached, request queued. "
  131. "%d active, %d queued requests." % (
  132. len(self.active), len(self.queue)))
  133. def _process_queue(self):
  134. with stack_context.NullContext():
  135. while self.queue and len(self.active) < self.max_clients:
  136. key, request, callback = self.queue.popleft()
  137. if key not in self.waiting:
  138. continue
  139. self._remove_timeout(key)
  140. self.active[key] = (request, callback)
  141. release_callback = functools.partial(self._release_fetch, key)
  142. self._handle_request(request, release_callback, callback)
  143. def _connection_class(self):
  144. return _HTTPConnection
  145. def _handle_request(self, request, release_callback, final_callback):
  146. self._connection_class()(
  147. self, request, release_callback,
  148. final_callback, self.max_buffer_size, self.tcp_client,
  149. self.max_header_size, self.max_body_size)
  150. def _release_fetch(self, key):
  151. del self.active[key]
  152. self._process_queue()
  153. def _remove_timeout(self, key):
  154. if key in self.waiting:
  155. request, callback, timeout_handle = self.waiting[key]
  156. if timeout_handle is not None:
  157. self.io_loop.remove_timeout(timeout_handle)
  158. del self.waiting[key]
  159. def _on_timeout(self, key, info=None):
  160. """Timeout callback of request.
  161. Construct a timeout HTTPResponse when a timeout occurs.
  162. :arg object key: A simple object to mark the request.
  163. :info string key: More detailed timeout information.
  164. """
  165. request, callback, timeout_handle = self.waiting[key]
  166. self.queue.remove((key, request, callback))
  167. error_message = "Timeout {0}".format(info) if info else "Timeout"
  168. timeout_response = HTTPResponse(
  169. request, 599, error=HTTPTimeoutError(error_message),
  170. request_time=self.io_loop.time() - request.start_time)
  171. self.io_loop.add_callback(callback, timeout_response)
  172. del self.waiting[key]
  173. class _HTTPConnection(httputil.HTTPMessageDelegate):
  174. _SUPPORTED_METHODS = set(["GET", "HEAD", "POST", "PUT", "DELETE", "PATCH", "OPTIONS"])
  175. def __init__(self, client, request, release_callback,
  176. final_callback, max_buffer_size, tcp_client,
  177. max_header_size, max_body_size):
  178. self.io_loop = IOLoop.current()
  179. self.start_time = self.io_loop.time()
  180. self.start_wall_time = time.time()
  181. self.client = client
  182. self.request = request
  183. self.release_callback = release_callback
  184. self.final_callback = final_callback
  185. self.max_buffer_size = max_buffer_size
  186. self.tcp_client = tcp_client
  187. self.max_header_size = max_header_size
  188. self.max_body_size = max_body_size
  189. self.code = None
  190. self.headers = None
  191. self.chunks = []
  192. self._decompressor = None
  193. # Timeout handle returned by IOLoop.add_timeout
  194. self._timeout = None
  195. self._sockaddr = None
  196. IOLoop.current().add_callback(self.run)
  197. @gen.coroutine
  198. def run(self):
  199. try:
  200. self.parsed = urlparse.urlsplit(_unicode(self.request.url))
  201. if self.parsed.scheme not in ("http", "https"):
  202. raise ValueError("Unsupported url scheme: %s" %
  203. self.request.url)
  204. # urlsplit results have hostname and port results, but they
  205. # didn't support ipv6 literals until python 2.7.
  206. netloc = self.parsed.netloc
  207. if "@" in netloc:
  208. userpass, _, netloc = netloc.rpartition("@")
  209. host, port = httputil.split_host_and_port(netloc)
  210. if port is None:
  211. port = 443 if self.parsed.scheme == "https" else 80
  212. if re.match(r'^\[.*\]$', host):
  213. # raw ipv6 addresses in urls are enclosed in brackets
  214. host = host[1:-1]
  215. self.parsed_hostname = host # save final host for _on_connect
  216. if self.request.allow_ipv6 is False:
  217. af = socket.AF_INET
  218. else:
  219. af = socket.AF_UNSPEC
  220. ssl_options = self._get_ssl_options(self.parsed.scheme)
  221. timeout = min(self.request.connect_timeout, self.request.request_timeout)
  222. if timeout:
  223. self._timeout = self.io_loop.add_timeout(
  224. self.start_time + timeout,
  225. stack_context.wrap(functools.partial(self._on_timeout, "while connecting")))
  226. stream = yield self.tcp_client.connect(
  227. host, port, af=af,
  228. ssl_options=ssl_options,
  229. max_buffer_size=self.max_buffer_size)
  230. if self.final_callback is None:
  231. # final_callback is cleared if we've hit our timeout.
  232. stream.close()
  233. return
  234. self.stream = stream
  235. self.stream.set_close_callback(self.on_connection_close)
  236. self._remove_timeout()
  237. if self.final_callback is None:
  238. return
  239. if self.request.request_timeout:
  240. self._timeout = self.io_loop.add_timeout(
  241. self.start_time + self.request.request_timeout,
  242. stack_context.wrap(functools.partial(self._on_timeout, "during request")))
  243. if (self.request.method not in self._SUPPORTED_METHODS and
  244. not self.request.allow_nonstandard_methods):
  245. raise KeyError("unknown method %s" % self.request.method)
  246. for key in ('network_interface',
  247. 'proxy_host', 'proxy_port',
  248. 'proxy_username', 'proxy_password',
  249. 'proxy_auth_mode'):
  250. if getattr(self.request, key, None):
  251. raise NotImplementedError('%s not supported' % key)
  252. if "Connection" not in self.request.headers:
  253. self.request.headers["Connection"] = "close"
  254. if "Host" not in self.request.headers:
  255. if '@' in self.parsed.netloc:
  256. self.request.headers["Host"] = self.parsed.netloc.rpartition('@')[-1]
  257. else:
  258. self.request.headers["Host"] = self.parsed.netloc
  259. username, password = None, None
  260. if self.parsed.username is not None:
  261. username, password = self.parsed.username, self.parsed.password
  262. elif self.request.auth_username is not None:
  263. username = self.request.auth_username
  264. password = self.request.auth_password or ''
  265. if username is not None:
  266. if self.request.auth_mode not in (None, "basic"):
  267. raise ValueError("unsupported auth_mode %s",
  268. self.request.auth_mode)
  269. self.request.headers["Authorization"] = (
  270. b"Basic " + base64.b64encode(
  271. httputil.encode_username_password(username, password)))
  272. if self.request.user_agent:
  273. self.request.headers["User-Agent"] = self.request.user_agent
  274. if not self.request.allow_nonstandard_methods:
  275. # Some HTTP methods nearly always have bodies while others
  276. # almost never do. Fail in this case unless the user has
  277. # opted out of sanity checks with allow_nonstandard_methods.
  278. body_expected = self.request.method in ("POST", "PATCH", "PUT")
  279. body_present = (self.request.body is not None or
  280. self.request.body_producer is not None)
  281. if ((body_expected and not body_present) or
  282. (body_present and not body_expected)):
  283. raise ValueError(
  284. 'Body must %sbe None for method %s (unless '
  285. 'allow_nonstandard_methods is true)' %
  286. ('not ' if body_expected else '', self.request.method))
  287. if self.request.expect_100_continue:
  288. self.request.headers["Expect"] = "100-continue"
  289. if self.request.body is not None:
  290. # When body_producer is used the caller is responsible for
  291. # setting Content-Length (or else chunked encoding will be used).
  292. self.request.headers["Content-Length"] = str(len(
  293. self.request.body))
  294. if (self.request.method == "POST" and
  295. "Content-Type" not in self.request.headers):
  296. self.request.headers["Content-Type"] = "application/x-www-form-urlencoded"
  297. if self.request.decompress_response:
  298. self.request.headers["Accept-Encoding"] = "gzip"
  299. req_path = ((self.parsed.path or '/') +
  300. (('?' + self.parsed.query) if self.parsed.query else ''))
  301. self.connection = self._create_connection(stream)
  302. start_line = httputil.RequestStartLine(self.request.method,
  303. req_path, '')
  304. self.connection.write_headers(start_line, self.request.headers)
  305. if self.request.expect_100_continue:
  306. yield self.connection.read_response(self)
  307. else:
  308. yield self._write_body(True)
  309. except Exception:
  310. if not self._handle_exception(*sys.exc_info()):
  311. raise
  312. def _get_ssl_options(self, scheme):
  313. if scheme == "https":
  314. if self.request.ssl_options is not None:
  315. return self.request.ssl_options
  316. # If we are using the defaults, don't construct a
  317. # new SSLContext.
  318. if (self.request.validate_cert and
  319. self.request.ca_certs is None and
  320. self.request.client_cert is None and
  321. self.request.client_key is None):
  322. return _client_ssl_defaults
  323. ssl_ctx = ssl.create_default_context(
  324. ssl.Purpose.SERVER_AUTH,
  325. cafile=self.request.ca_certs)
  326. if not self.request.validate_cert:
  327. ssl_ctx.check_hostname = False
  328. ssl_ctx.verify_mode = ssl.CERT_NONE
  329. if self.request.client_cert is not None:
  330. ssl_ctx.load_cert_chain(self.request.client_cert,
  331. self.request.client_key)
  332. if hasattr(ssl, 'OP_NO_COMPRESSION'):
  333. # See netutil.ssl_options_to_context
  334. ssl_ctx.options |= ssl.OP_NO_COMPRESSION
  335. return ssl_ctx
  336. return None
  337. def _on_timeout(self, info=None):
  338. """Timeout callback of _HTTPConnection instance.
  339. Raise a `HTTPTimeoutError` when a timeout occurs.
  340. :info string key: More detailed timeout information.
  341. """
  342. self._timeout = None
  343. error_message = "Timeout {0}".format(info) if info else "Timeout"
  344. if self.final_callback is not None:
  345. self._handle_exception(HTTPTimeoutError, HTTPTimeoutError(error_message),
  346. None)
  347. def _remove_timeout(self):
  348. if self._timeout is not None:
  349. self.io_loop.remove_timeout(self._timeout)
  350. self._timeout = None
  351. def _create_connection(self, stream):
  352. stream.set_nodelay(True)
  353. connection = HTTP1Connection(
  354. stream, True,
  355. HTTP1ConnectionParameters(
  356. no_keep_alive=True,
  357. max_header_size=self.max_header_size,
  358. max_body_size=self.max_body_size,
  359. decompress=self.request.decompress_response),
  360. self._sockaddr)
  361. return connection
  362. @gen.coroutine
  363. def _write_body(self, start_read):
  364. if self.request.body is not None:
  365. self.connection.write(self.request.body)
  366. elif self.request.body_producer is not None:
  367. fut = self.request.body_producer(self.connection.write)
  368. if fut is not None:
  369. yield fut
  370. self.connection.finish()
  371. if start_read:
  372. try:
  373. yield self.connection.read_response(self)
  374. except StreamClosedError:
  375. if not self._handle_exception(*sys.exc_info()):
  376. raise
  377. def _release(self):
  378. if self.release_callback is not None:
  379. release_callback = self.release_callback
  380. self.release_callback = None
  381. release_callback()
  382. def _run_callback(self, response):
  383. self._release()
  384. if self.final_callback is not None:
  385. final_callback = self.final_callback
  386. self.final_callback = None
  387. self.io_loop.add_callback(final_callback, response)
  388. def _handle_exception(self, typ, value, tb):
  389. if self.final_callback:
  390. self._remove_timeout()
  391. if isinstance(value, StreamClosedError):
  392. if value.real_error is None:
  393. value = HTTPStreamClosedError("Stream closed")
  394. else:
  395. value = value.real_error
  396. self._run_callback(HTTPResponse(self.request, 599, error=value,
  397. request_time=self.io_loop.time() - self.start_time,
  398. start_time=self.start_wall_time,
  399. ))
  400. if hasattr(self, "stream"):
  401. # TODO: this may cause a StreamClosedError to be raised
  402. # by the connection's Future. Should we cancel the
  403. # connection more gracefully?
  404. self.stream.close()
  405. return True
  406. else:
  407. # If our callback has already been called, we are probably
  408. # catching an exception that is not caused by us but rather
  409. # some child of our callback. Rather than drop it on the floor,
  410. # pass it along, unless it's just the stream being closed.
  411. return isinstance(value, StreamClosedError)
  412. def on_connection_close(self):
  413. if self.final_callback is not None:
  414. message = "Connection closed"
  415. if self.stream.error:
  416. raise self.stream.error
  417. try:
  418. raise HTTPStreamClosedError(message)
  419. except HTTPStreamClosedError:
  420. self._handle_exception(*sys.exc_info())
  421. def headers_received(self, first_line, headers):
  422. if self.request.expect_100_continue and first_line.code == 100:
  423. self._write_body(False)
  424. return
  425. self.code = first_line.code
  426. self.reason = first_line.reason
  427. self.headers = headers
  428. if self._should_follow_redirect():
  429. return
  430. if self.request.header_callback is not None:
  431. # Reassemble the start line.
  432. self.request.header_callback('%s %s %s\r\n' % first_line)
  433. for k, v in self.headers.get_all():
  434. self.request.header_callback("%s: %s\r\n" % (k, v))
  435. self.request.header_callback('\r\n')
  436. def _should_follow_redirect(self):
  437. return (self.request.follow_redirects and
  438. self.request.max_redirects > 0 and
  439. self.code in (301, 302, 303, 307, 308))
  440. def finish(self):
  441. data = b''.join(self.chunks)
  442. self._remove_timeout()
  443. original_request = getattr(self.request, "original_request",
  444. self.request)
  445. if self._should_follow_redirect():
  446. assert isinstance(self.request, _RequestProxy)
  447. new_request = copy.copy(self.request.request)
  448. new_request.url = urlparse.urljoin(self.request.url,
  449. self.headers["Location"])
  450. new_request.max_redirects = self.request.max_redirects - 1
  451. del new_request.headers["Host"]
  452. # http://www.w3.org/Protocols/rfc2616/rfc2616-sec10.html#sec10.3.4
  453. # Client SHOULD make a GET request after a 303.
  454. # According to the spec, 302 should be followed by the same
  455. # method as the original request, but in practice browsers
  456. # treat 302 the same as 303, and many servers use 302 for
  457. # compatibility with pre-HTTP/1.1 user agents which don't
  458. # understand the 303 status.
  459. if self.code in (302, 303):
  460. new_request.method = "GET"
  461. new_request.body = None
  462. for h in ["Content-Length", "Content-Type",
  463. "Content-Encoding", "Transfer-Encoding"]:
  464. try:
  465. del self.request.headers[h]
  466. except KeyError:
  467. pass
  468. new_request.original_request = original_request
  469. final_callback = self.final_callback
  470. self.final_callback = None
  471. self._release()
  472. fut = self.client.fetch(new_request, raise_error=False)
  473. fut.add_done_callback(lambda f: final_callback(f.result()))
  474. self._on_end_request()
  475. return
  476. if self.request.streaming_callback:
  477. buffer = BytesIO()
  478. else:
  479. buffer = BytesIO(data) # TODO: don't require one big string?
  480. response = HTTPResponse(original_request,
  481. self.code, reason=getattr(self, 'reason', None),
  482. headers=self.headers,
  483. request_time=self.io_loop.time() - self.start_time,
  484. start_time=self.start_wall_time,
  485. buffer=buffer,
  486. effective_url=self.request.url)
  487. self._run_callback(response)
  488. self._on_end_request()
  489. def _on_end_request(self):
  490. self.stream.close()
  491. def data_received(self, chunk):
  492. if self._should_follow_redirect():
  493. # We're going to follow a redirect so just discard the body.
  494. return
  495. if self.request.streaming_callback is not None:
  496. self.request.streaming_callback(chunk)
  497. else:
  498. self.chunks.append(chunk)
  499. if __name__ == "__main__":
  500. AsyncHTTPClient.configure(SimpleAsyncHTTPClient)
  501. main()