curl_httpclient.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514
  1. #
  2. # Copyright 2009 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Non-blocking HTTP client implementation using pycurl."""
  16. from __future__ import absolute_import, division, print_function
  17. import collections
  18. import functools
  19. import logging
  20. import pycurl # type: ignore
  21. import threading
  22. import time
  23. from io import BytesIO
  24. from tornado import httputil
  25. from tornado import ioloop
  26. from tornado import stack_context
  27. from tornado.escape import utf8, native_str
  28. from tornado.httpclient import HTTPResponse, HTTPError, AsyncHTTPClient, main
  29. curl_log = logging.getLogger('tornado.curl_httpclient')
  30. class CurlAsyncHTTPClient(AsyncHTTPClient):
  31. def initialize(self, max_clients=10, defaults=None):
  32. super(CurlAsyncHTTPClient, self).initialize(defaults=defaults)
  33. self._multi = pycurl.CurlMulti()
  34. self._multi.setopt(pycurl.M_TIMERFUNCTION, self._set_timeout)
  35. self._multi.setopt(pycurl.M_SOCKETFUNCTION, self._handle_socket)
  36. self._curls = [self._curl_create() for i in range(max_clients)]
  37. self._free_list = self._curls[:]
  38. self._requests = collections.deque()
  39. self._fds = {}
  40. self._timeout = None
  41. # libcurl has bugs that sometimes cause it to not report all
  42. # relevant file descriptors and timeouts to TIMERFUNCTION/
  43. # SOCKETFUNCTION. Mitigate the effects of such bugs by
  44. # forcing a periodic scan of all active requests.
  45. self._force_timeout_callback = ioloop.PeriodicCallback(
  46. self._handle_force_timeout, 1000)
  47. self._force_timeout_callback.start()
  48. # Work around a bug in libcurl 7.29.0: Some fields in the curl
  49. # multi object are initialized lazily, and its destructor will
  50. # segfault if it is destroyed without having been used. Add
  51. # and remove a dummy handle to make sure everything is
  52. # initialized.
  53. dummy_curl_handle = pycurl.Curl()
  54. self._multi.add_handle(dummy_curl_handle)
  55. self._multi.remove_handle(dummy_curl_handle)
  56. def close(self):
  57. self._force_timeout_callback.stop()
  58. if self._timeout is not None:
  59. self.io_loop.remove_timeout(self._timeout)
  60. for curl in self._curls:
  61. curl.close()
  62. self._multi.close()
  63. super(CurlAsyncHTTPClient, self).close()
  64. # Set below properties to None to reduce the reference count of current
  65. # instance, because those properties hold some methods of current
  66. # instance that will case circular reference.
  67. self._force_timeout_callback = None
  68. self._multi = None
  69. def fetch_impl(self, request, callback):
  70. self._requests.append((request, callback, self.io_loop.time()))
  71. self._process_queue()
  72. self._set_timeout(0)
  73. def _handle_socket(self, event, fd, multi, data):
  74. """Called by libcurl when it wants to change the file descriptors
  75. it cares about.
  76. """
  77. event_map = {
  78. pycurl.POLL_NONE: ioloop.IOLoop.NONE,
  79. pycurl.POLL_IN: ioloop.IOLoop.READ,
  80. pycurl.POLL_OUT: ioloop.IOLoop.WRITE,
  81. pycurl.POLL_INOUT: ioloop.IOLoop.READ | ioloop.IOLoop.WRITE
  82. }
  83. if event == pycurl.POLL_REMOVE:
  84. if fd in self._fds:
  85. self.io_loop.remove_handler(fd)
  86. del self._fds[fd]
  87. else:
  88. ioloop_event = event_map[event]
  89. # libcurl sometimes closes a socket and then opens a new
  90. # one using the same FD without giving us a POLL_NONE in
  91. # between. This is a problem with the epoll IOLoop,
  92. # because the kernel can tell when a socket is closed and
  93. # removes it from the epoll automatically, causing future
  94. # update_handler calls to fail. Since we can't tell when
  95. # this has happened, always use remove and re-add
  96. # instead of update.
  97. if fd in self._fds:
  98. self.io_loop.remove_handler(fd)
  99. self.io_loop.add_handler(fd, self._handle_events,
  100. ioloop_event)
  101. self._fds[fd] = ioloop_event
  102. def _set_timeout(self, msecs):
  103. """Called by libcurl to schedule a timeout."""
  104. if self._timeout is not None:
  105. self.io_loop.remove_timeout(self._timeout)
  106. self._timeout = self.io_loop.add_timeout(
  107. self.io_loop.time() + msecs / 1000.0, self._handle_timeout)
  108. def _handle_events(self, fd, events):
  109. """Called by IOLoop when there is activity on one of our
  110. file descriptors.
  111. """
  112. action = 0
  113. if events & ioloop.IOLoop.READ:
  114. action |= pycurl.CSELECT_IN
  115. if events & ioloop.IOLoop.WRITE:
  116. action |= pycurl.CSELECT_OUT
  117. while True:
  118. try:
  119. ret, num_handles = self._multi.socket_action(fd, action)
  120. except pycurl.error as e:
  121. ret = e.args[0]
  122. if ret != pycurl.E_CALL_MULTI_PERFORM:
  123. break
  124. self._finish_pending_requests()
  125. def _handle_timeout(self):
  126. """Called by IOLoop when the requested timeout has passed."""
  127. with stack_context.NullContext():
  128. self._timeout = None
  129. while True:
  130. try:
  131. ret, num_handles = self._multi.socket_action(
  132. pycurl.SOCKET_TIMEOUT, 0)
  133. except pycurl.error as e:
  134. ret = e.args[0]
  135. if ret != pycurl.E_CALL_MULTI_PERFORM:
  136. break
  137. self._finish_pending_requests()
  138. # In theory, we shouldn't have to do this because curl will
  139. # call _set_timeout whenever the timeout changes. However,
  140. # sometimes after _handle_timeout we will need to reschedule
  141. # immediately even though nothing has changed from curl's
  142. # perspective. This is because when socket_action is
  143. # called with SOCKET_TIMEOUT, libcurl decides internally which
  144. # timeouts need to be processed by using a monotonic clock
  145. # (where available) while tornado uses python's time.time()
  146. # to decide when timeouts have occurred. When those clocks
  147. # disagree on elapsed time (as they will whenever there is an
  148. # NTP adjustment), tornado might call _handle_timeout before
  149. # libcurl is ready. After each timeout, resync the scheduled
  150. # timeout with libcurl's current state.
  151. new_timeout = self._multi.timeout()
  152. if new_timeout >= 0:
  153. self._set_timeout(new_timeout)
  154. def _handle_force_timeout(self):
  155. """Called by IOLoop periodically to ask libcurl to process any
  156. events it may have forgotten about.
  157. """
  158. with stack_context.NullContext():
  159. while True:
  160. try:
  161. ret, num_handles = self._multi.socket_all()
  162. except pycurl.error as e:
  163. ret = e.args[0]
  164. if ret != pycurl.E_CALL_MULTI_PERFORM:
  165. break
  166. self._finish_pending_requests()
  167. def _finish_pending_requests(self):
  168. """Process any requests that were completed by the last
  169. call to multi.socket_action.
  170. """
  171. while True:
  172. num_q, ok_list, err_list = self._multi.info_read()
  173. for curl in ok_list:
  174. self._finish(curl)
  175. for curl, errnum, errmsg in err_list:
  176. self._finish(curl, errnum, errmsg)
  177. if num_q == 0:
  178. break
  179. self._process_queue()
  180. def _process_queue(self):
  181. with stack_context.NullContext():
  182. while True:
  183. started = 0
  184. while self._free_list and self._requests:
  185. started += 1
  186. curl = self._free_list.pop()
  187. (request, callback, queue_start_time) = self._requests.popleft()
  188. curl.info = {
  189. "headers": httputil.HTTPHeaders(),
  190. "buffer": BytesIO(),
  191. "request": request,
  192. "callback": callback,
  193. "queue_start_time": queue_start_time,
  194. "curl_start_time": time.time(),
  195. "curl_start_ioloop_time": self.io_loop.current().time(),
  196. }
  197. try:
  198. self._curl_setup_request(
  199. curl, request, curl.info["buffer"],
  200. curl.info["headers"])
  201. except Exception as e:
  202. # If there was an error in setup, pass it on
  203. # to the callback. Note that allowing the
  204. # error to escape here will appear to work
  205. # most of the time since we are still in the
  206. # caller's original stack frame, but when
  207. # _process_queue() is called from
  208. # _finish_pending_requests the exceptions have
  209. # nowhere to go.
  210. self._free_list.append(curl)
  211. callback(HTTPResponse(
  212. request=request,
  213. code=599,
  214. error=e))
  215. else:
  216. self._multi.add_handle(curl)
  217. if not started:
  218. break
  219. def _finish(self, curl, curl_error=None, curl_message=None):
  220. info = curl.info
  221. curl.info = None
  222. self._multi.remove_handle(curl)
  223. self._free_list.append(curl)
  224. buffer = info["buffer"]
  225. if curl_error:
  226. error = CurlError(curl_error, curl_message)
  227. code = error.code
  228. effective_url = None
  229. buffer.close()
  230. buffer = None
  231. else:
  232. error = None
  233. code = curl.getinfo(pycurl.HTTP_CODE)
  234. effective_url = curl.getinfo(pycurl.EFFECTIVE_URL)
  235. buffer.seek(0)
  236. # the various curl timings are documented at
  237. # http://curl.haxx.se/libcurl/c/curl_easy_getinfo.html
  238. time_info = dict(
  239. queue=info["curl_start_ioloop_time"] - info["queue_start_time"],
  240. namelookup=curl.getinfo(pycurl.NAMELOOKUP_TIME),
  241. connect=curl.getinfo(pycurl.CONNECT_TIME),
  242. appconnect=curl.getinfo(pycurl.APPCONNECT_TIME),
  243. pretransfer=curl.getinfo(pycurl.PRETRANSFER_TIME),
  244. starttransfer=curl.getinfo(pycurl.STARTTRANSFER_TIME),
  245. total=curl.getinfo(pycurl.TOTAL_TIME),
  246. redirect=curl.getinfo(pycurl.REDIRECT_TIME),
  247. )
  248. try:
  249. info["callback"](HTTPResponse(
  250. request=info["request"], code=code, headers=info["headers"],
  251. buffer=buffer, effective_url=effective_url, error=error,
  252. reason=info['headers'].get("X-Http-Reason", None),
  253. request_time=self.io_loop.time() - info["curl_start_ioloop_time"],
  254. start_time=info["curl_start_time"],
  255. time_info=time_info))
  256. except Exception:
  257. self.handle_callback_exception(info["callback"])
  258. def handle_callback_exception(self, callback):
  259. self.io_loop.handle_callback_exception(callback)
  260. def _curl_create(self):
  261. curl = pycurl.Curl()
  262. if curl_log.isEnabledFor(logging.DEBUG):
  263. curl.setopt(pycurl.VERBOSE, 1)
  264. curl.setopt(pycurl.DEBUGFUNCTION, self._curl_debug)
  265. if hasattr(pycurl, 'PROTOCOLS'): # PROTOCOLS first appeared in pycurl 7.19.5 (2014-07-12)
  266. curl.setopt(pycurl.PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
  267. curl.setopt(pycurl.REDIR_PROTOCOLS, pycurl.PROTO_HTTP | pycurl.PROTO_HTTPS)
  268. return curl
  269. def _curl_setup_request(self, curl, request, buffer, headers):
  270. curl.setopt(pycurl.URL, native_str(request.url))
  271. # libcurl's magic "Expect: 100-continue" behavior causes delays
  272. # with servers that don't support it (which include, among others,
  273. # Google's OpenID endpoint). Additionally, this behavior has
  274. # a bug in conjunction with the curl_multi_socket_action API
  275. # (https://sourceforge.net/tracker/?func=detail&atid=100976&aid=3039744&group_id=976),
  276. # which increases the delays. It's more trouble than it's worth,
  277. # so just turn off the feature (yes, setting Expect: to an empty
  278. # value is the official way to disable this)
  279. if "Expect" not in request.headers:
  280. request.headers["Expect"] = ""
  281. # libcurl adds Pragma: no-cache by default; disable that too
  282. if "Pragma" not in request.headers:
  283. request.headers["Pragma"] = ""
  284. curl.setopt(pycurl.HTTPHEADER,
  285. ["%s: %s" % (native_str(k), native_str(v))
  286. for k, v in request.headers.get_all()])
  287. curl.setopt(pycurl.HEADERFUNCTION,
  288. functools.partial(self._curl_header_callback,
  289. headers, request.header_callback))
  290. if request.streaming_callback:
  291. def write_function(chunk):
  292. self.io_loop.add_callback(request.streaming_callback, chunk)
  293. else:
  294. write_function = buffer.write
  295. curl.setopt(pycurl.WRITEFUNCTION, write_function)
  296. curl.setopt(pycurl.FOLLOWLOCATION, request.follow_redirects)
  297. curl.setopt(pycurl.MAXREDIRS, request.max_redirects)
  298. curl.setopt(pycurl.CONNECTTIMEOUT_MS, int(1000 * request.connect_timeout))
  299. curl.setopt(pycurl.TIMEOUT_MS, int(1000 * request.request_timeout))
  300. if request.user_agent:
  301. curl.setopt(pycurl.USERAGENT, native_str(request.user_agent))
  302. else:
  303. curl.setopt(pycurl.USERAGENT, "Mozilla/5.0 (compatible; pycurl)")
  304. if request.network_interface:
  305. curl.setopt(pycurl.INTERFACE, request.network_interface)
  306. if request.decompress_response:
  307. curl.setopt(pycurl.ENCODING, "gzip,deflate")
  308. else:
  309. curl.setopt(pycurl.ENCODING, "none")
  310. if request.proxy_host and request.proxy_port:
  311. curl.setopt(pycurl.PROXY, request.proxy_host)
  312. curl.setopt(pycurl.PROXYPORT, request.proxy_port)
  313. if request.proxy_username:
  314. credentials = httputil.encode_username_password(request.proxy_username,
  315. request.proxy_password)
  316. curl.setopt(pycurl.PROXYUSERPWD, credentials)
  317. if (request.proxy_auth_mode is None or
  318. request.proxy_auth_mode == "basic"):
  319. curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_BASIC)
  320. elif request.proxy_auth_mode == "digest":
  321. curl.setopt(pycurl.PROXYAUTH, pycurl.HTTPAUTH_DIGEST)
  322. else:
  323. raise ValueError(
  324. "Unsupported proxy_auth_mode %s" % request.proxy_auth_mode)
  325. else:
  326. curl.setopt(pycurl.PROXY, '')
  327. curl.unsetopt(pycurl.PROXYUSERPWD)
  328. if request.validate_cert:
  329. curl.setopt(pycurl.SSL_VERIFYPEER, 1)
  330. curl.setopt(pycurl.SSL_VERIFYHOST, 2)
  331. else:
  332. curl.setopt(pycurl.SSL_VERIFYPEER, 0)
  333. curl.setopt(pycurl.SSL_VERIFYHOST, 0)
  334. if request.ca_certs is not None:
  335. curl.setopt(pycurl.CAINFO, request.ca_certs)
  336. else:
  337. # There is no way to restore pycurl.CAINFO to its default value
  338. # (Using unsetopt makes it reject all certificates).
  339. # I don't see any way to read the default value from python so it
  340. # can be restored later. We'll have to just leave CAINFO untouched
  341. # if no ca_certs file was specified, and require that if any
  342. # request uses a custom ca_certs file, they all must.
  343. pass
  344. if request.allow_ipv6 is False:
  345. # Curl behaves reasonably when DNS resolution gives an ipv6 address
  346. # that we can't reach, so allow ipv6 unless the user asks to disable.
  347. curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_V4)
  348. else:
  349. curl.setopt(pycurl.IPRESOLVE, pycurl.IPRESOLVE_WHATEVER)
  350. # Set the request method through curl's irritating interface which makes
  351. # up names for almost every single method
  352. curl_options = {
  353. "GET": pycurl.HTTPGET,
  354. "POST": pycurl.POST,
  355. "PUT": pycurl.UPLOAD,
  356. "HEAD": pycurl.NOBODY,
  357. }
  358. custom_methods = set(["DELETE", "OPTIONS", "PATCH"])
  359. for o in curl_options.values():
  360. curl.setopt(o, False)
  361. if request.method in curl_options:
  362. curl.unsetopt(pycurl.CUSTOMREQUEST)
  363. curl.setopt(curl_options[request.method], True)
  364. elif request.allow_nonstandard_methods or request.method in custom_methods:
  365. curl.setopt(pycurl.CUSTOMREQUEST, request.method)
  366. else:
  367. raise KeyError('unknown method ' + request.method)
  368. body_expected = request.method in ("POST", "PATCH", "PUT")
  369. body_present = request.body is not None
  370. if not request.allow_nonstandard_methods:
  371. # Some HTTP methods nearly always have bodies while others
  372. # almost never do. Fail in this case unless the user has
  373. # opted out of sanity checks with allow_nonstandard_methods.
  374. if ((body_expected and not body_present) or
  375. (body_present and not body_expected)):
  376. raise ValueError(
  377. 'Body must %sbe None for method %s (unless '
  378. 'allow_nonstandard_methods is true)' %
  379. ('not ' if body_expected else '', request.method))
  380. if body_expected or body_present:
  381. if request.method == "GET":
  382. # Even with `allow_nonstandard_methods` we disallow
  383. # GET with a body (because libcurl doesn't allow it
  384. # unless we use CUSTOMREQUEST). While the spec doesn't
  385. # forbid clients from sending a body, it arguably
  386. # disallows the server from doing anything with them.
  387. raise ValueError('Body must be None for GET request')
  388. request_buffer = BytesIO(utf8(request.body or ''))
  389. def ioctl(cmd):
  390. if cmd == curl.IOCMD_RESTARTREAD:
  391. request_buffer.seek(0)
  392. curl.setopt(pycurl.READFUNCTION, request_buffer.read)
  393. curl.setopt(pycurl.IOCTLFUNCTION, ioctl)
  394. if request.method == "POST":
  395. curl.setopt(pycurl.POSTFIELDSIZE, len(request.body or ''))
  396. else:
  397. curl.setopt(pycurl.UPLOAD, True)
  398. curl.setopt(pycurl.INFILESIZE, len(request.body or ''))
  399. if request.auth_username is not None:
  400. if request.auth_mode is None or request.auth_mode == "basic":
  401. curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_BASIC)
  402. elif request.auth_mode == "digest":
  403. curl.setopt(pycurl.HTTPAUTH, pycurl.HTTPAUTH_DIGEST)
  404. else:
  405. raise ValueError("Unsupported auth_mode %s" % request.auth_mode)
  406. userpwd = httputil.encode_username_password(request.auth_username,
  407. request.auth_password)
  408. curl.setopt(pycurl.USERPWD, userpwd)
  409. curl_log.debug("%s %s (username: %r)", request.method, request.url,
  410. request.auth_username)
  411. else:
  412. curl.unsetopt(pycurl.USERPWD)
  413. curl_log.debug("%s %s", request.method, request.url)
  414. if request.client_cert is not None:
  415. curl.setopt(pycurl.SSLCERT, request.client_cert)
  416. if request.client_key is not None:
  417. curl.setopt(pycurl.SSLKEY, request.client_key)
  418. if request.ssl_options is not None:
  419. raise ValueError("ssl_options not supported in curl_httpclient")
  420. if threading.activeCount() > 1:
  421. # libcurl/pycurl is not thread-safe by default. When multiple threads
  422. # are used, signals should be disabled. This has the side effect
  423. # of disabling DNS timeouts in some environments (when libcurl is
  424. # not linked against ares), so we don't do it when there is only one
  425. # thread. Applications that use many short-lived threads may need
  426. # to set NOSIGNAL manually in a prepare_curl_callback since
  427. # there may not be any other threads running at the time we call
  428. # threading.activeCount.
  429. curl.setopt(pycurl.NOSIGNAL, 1)
  430. if request.prepare_curl_callback is not None:
  431. request.prepare_curl_callback(curl)
  432. def _curl_header_callback(self, headers, header_callback, header_line):
  433. header_line = native_str(header_line.decode('latin1'))
  434. if header_callback is not None:
  435. self.io_loop.add_callback(header_callback, header_line)
  436. # header_line as returned by curl includes the end-of-line characters.
  437. # whitespace at the start should be preserved to allow multi-line headers
  438. header_line = header_line.rstrip()
  439. if header_line.startswith("HTTP/"):
  440. headers.clear()
  441. try:
  442. (__, __, reason) = httputil.parse_response_start_line(header_line)
  443. header_line = "X-Http-Reason: %s" % reason
  444. except httputil.HTTPInputError:
  445. return
  446. if not header_line:
  447. return
  448. headers.parse_line(header_line)
  449. def _curl_debug(self, debug_type, debug_msg):
  450. debug_types = ('I', '<', '>', '<', '>')
  451. if debug_type == 0:
  452. debug_msg = native_str(debug_msg)
  453. curl_log.debug('%s', debug_msg.strip())
  454. elif debug_type in (1, 2):
  455. debug_msg = native_str(debug_msg)
  456. for line in debug_msg.splitlines():
  457. curl_log.debug('%s %s', debug_types[debug_type], line)
  458. elif debug_type == 4:
  459. curl_log.debug('%s %r', debug_types[debug_type], debug_msg)
  460. class CurlError(HTTPError):
  461. def __init__(self, errno, message):
  462. HTTPError.__init__(self, 599, message)
  463. self.errno = errno
  464. if __name__ == "__main__":
  465. AsyncHTTPClient.configure(CurlAsyncHTTPClient)
  466. main()