concurrent.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660
  1. #
  2. # Copyright 2012 Facebook
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  5. # not use this file except in compliance with the License. You may obtain
  6. # a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  12. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  13. # License for the specific language governing permissions and limitations
  14. # under the License.
  15. """Utilities for working with ``Future`` objects.
  16. ``Futures`` are a pattern for concurrent programming introduced in
  17. Python 3.2 in the `concurrent.futures` package, and also adopted (in a
  18. slightly different form) in Python 3.4's `asyncio` package. This
  19. package defines a ``Future`` class that is an alias for `asyncio.Future`
  20. when available, and a compatible implementation for older versions of
  21. Python. It also includes some utility functions for interacting with
  22. ``Future`` objects.
  23. While this package is an important part of Tornado's internal
  24. implementation, applications rarely need to interact with it
  25. directly.
  26. """
  27. from __future__ import absolute_import, division, print_function
  28. import functools
  29. import platform
  30. import textwrap
  31. import traceback
  32. import sys
  33. import warnings
  34. from tornado.log import app_log
  35. from tornado.stack_context import ExceptionStackContext, wrap
  36. from tornado.util import raise_exc_info, ArgReplacer, is_finalizing
  37. try:
  38. from concurrent import futures
  39. except ImportError:
  40. futures = None
  41. try:
  42. import asyncio
  43. except ImportError:
  44. asyncio = None
  45. try:
  46. import typing
  47. except ImportError:
  48. typing = None
  49. # Can the garbage collector handle cycles that include __del__ methods?
  50. # This is true in cpython beginning with version 3.4 (PEP 442).
  51. _GC_CYCLE_FINALIZERS = (platform.python_implementation() == 'CPython' and
  52. sys.version_info >= (3, 4))
  53. class ReturnValueIgnoredError(Exception):
  54. pass
  55. # This class and associated code in the future object is derived
  56. # from the Trollius project, a backport of asyncio to Python 2.x - 3.x
  57. class _TracebackLogger(object):
  58. """Helper to log a traceback upon destruction if not cleared.
  59. This solves a nasty problem with Futures and Tasks that have an
  60. exception set: if nobody asks for the exception, the exception is
  61. never logged. This violates the Zen of Python: 'Errors should
  62. never pass silently. Unless explicitly silenced.'
  63. However, we don't want to log the exception as soon as
  64. set_exception() is called: if the calling code is written
  65. properly, it will get the exception and handle it properly. But
  66. we *do* want to log it if result() or exception() was never called
  67. -- otherwise developers waste a lot of time wondering why their
  68. buggy code fails silently.
  69. An earlier attempt added a __del__() method to the Future class
  70. itself, but this backfired because the presence of __del__()
  71. prevents garbage collection from breaking cycles. A way out of
  72. this catch-22 is to avoid having a __del__() method on the Future
  73. class itself, but instead to have a reference to a helper object
  74. with a __del__() method that logs the traceback, where we ensure
  75. that the helper object doesn't participate in cycles, and only the
  76. Future has a reference to it.
  77. The helper object is added when set_exception() is called. When
  78. the Future is collected, and the helper is present, the helper
  79. object is also collected, and its __del__() method will log the
  80. traceback. When the Future's result() or exception() method is
  81. called (and a helper object is present), it removes the the helper
  82. object, after calling its clear() method to prevent it from
  83. logging.
  84. One downside is that we do a fair amount of work to extract the
  85. traceback from the exception, even when it is never logged. It
  86. would seem cheaper to just store the exception object, but that
  87. references the traceback, which references stack frames, which may
  88. reference the Future, which references the _TracebackLogger, and
  89. then the _TracebackLogger would be included in a cycle, which is
  90. what we're trying to avoid! As an optimization, we don't
  91. immediately format the exception; we only do the work when
  92. activate() is called, which call is delayed until after all the
  93. Future's callbacks have run. Since usually a Future has at least
  94. one callback (typically set by 'yield From') and usually that
  95. callback extracts the callback, thereby removing the need to
  96. format the exception.
  97. PS. I don't claim credit for this solution. I first heard of it
  98. in a discussion about closing files when they are collected.
  99. """
  100. __slots__ = ('exc_info', 'formatted_tb')
  101. def __init__(self, exc_info):
  102. self.exc_info = exc_info
  103. self.formatted_tb = None
  104. def activate(self):
  105. exc_info = self.exc_info
  106. if exc_info is not None:
  107. self.exc_info = None
  108. self.formatted_tb = traceback.format_exception(*exc_info)
  109. def clear(self):
  110. self.exc_info = None
  111. self.formatted_tb = None
  112. def __del__(self, is_finalizing=is_finalizing):
  113. if not is_finalizing() and self.formatted_tb:
  114. app_log.error('Future exception was never retrieved: %s',
  115. ''.join(self.formatted_tb).rstrip())
  116. class Future(object):
  117. """Placeholder for an asynchronous result.
  118. A ``Future`` encapsulates the result of an asynchronous
  119. operation. In synchronous applications ``Futures`` are used
  120. to wait for the result from a thread or process pool; in
  121. Tornado they are normally used with `.IOLoop.add_future` or by
  122. yielding them in a `.gen.coroutine`.
  123. `tornado.concurrent.Future` is an alias for `asyncio.Future` when
  124. that package is available (Python 3.4+). Unlike
  125. `concurrent.futures.Future`, the ``Futures`` used by Tornado and
  126. `asyncio` are not thread-safe (and therefore faster for use with
  127. single-threaded event loops).
  128. In addition to ``exception`` and ``set_exception``, Tornado's
  129. ``Future`` implementation supports storing an ``exc_info`` triple
  130. to support better tracebacks on Python 2. To set an ``exc_info``
  131. triple, use `future_set_exc_info`, and to retrieve one, call
  132. `result()` (which will raise it).
  133. .. versionchanged:: 4.0
  134. `tornado.concurrent.Future` is always a thread-unsafe ``Future``
  135. with support for the ``exc_info`` methods. Previously it would
  136. be an alias for the thread-safe `concurrent.futures.Future`
  137. if that package was available and fall back to the thread-unsafe
  138. implementation if it was not.
  139. .. versionchanged:: 4.1
  140. If a `.Future` contains an error but that error is never observed
  141. (by calling ``result()``, ``exception()``, or ``exc_info()``),
  142. a stack trace will be logged when the `.Future` is garbage collected.
  143. This normally indicates an error in the application, but in cases
  144. where it results in undesired logging it may be necessary to
  145. suppress the logging by ensuring that the exception is observed:
  146. ``f.add_done_callback(lambda f: f.exception())``.
  147. .. versionchanged:: 5.0
  148. This class was previoiusly available under the name
  149. ``TracebackFuture``. This name, which was deprecated since
  150. version 4.0, has been removed. When `asyncio` is available
  151. ``tornado.concurrent.Future`` is now an alias for
  152. `asyncio.Future`. Like `asyncio.Future`, callbacks are now
  153. always scheduled on the `.IOLoop` and are never run
  154. synchronously.
  155. """
  156. def __init__(self):
  157. self._done = False
  158. self._result = None
  159. self._exc_info = None
  160. self._log_traceback = False # Used for Python >= 3.4
  161. self._tb_logger = None # Used for Python <= 3.3
  162. self._callbacks = []
  163. # Implement the Python 3.5 Awaitable protocol if possible
  164. # (we can't use return and yield together until py33).
  165. if sys.version_info >= (3, 3):
  166. exec(textwrap.dedent("""
  167. def __await__(self):
  168. return (yield self)
  169. """))
  170. else:
  171. # Py2-compatible version for use with cython.
  172. def __await__(self):
  173. result = yield self
  174. # StopIteration doesn't take args before py33,
  175. # but Cython recognizes the args tuple.
  176. e = StopIteration()
  177. e.args = (result,)
  178. raise e
  179. def cancel(self):
  180. """Cancel the operation, if possible.
  181. Tornado ``Futures`` do not support cancellation, so this method always
  182. returns False.
  183. """
  184. return False
  185. def cancelled(self):
  186. """Returns True if the operation has been cancelled.
  187. Tornado ``Futures`` do not support cancellation, so this method
  188. always returns False.
  189. """
  190. return False
  191. def running(self):
  192. """Returns True if this operation is currently running."""
  193. return not self._done
  194. def done(self):
  195. """Returns True if the future has finished running."""
  196. return self._done
  197. def _clear_tb_log(self):
  198. self._log_traceback = False
  199. if self._tb_logger is not None:
  200. self._tb_logger.clear()
  201. self._tb_logger = None
  202. def result(self, timeout=None):
  203. """If the operation succeeded, return its result. If it failed,
  204. re-raise its exception.
  205. This method takes a ``timeout`` argument for compatibility with
  206. `concurrent.futures.Future` but it is an error to call it
  207. before the `Future` is done, so the ``timeout`` is never used.
  208. """
  209. self._clear_tb_log()
  210. if self._result is not None:
  211. return self._result
  212. if self._exc_info is not None:
  213. try:
  214. raise_exc_info(self._exc_info)
  215. finally:
  216. self = None
  217. self._check_done()
  218. return self._result
  219. def exception(self, timeout=None):
  220. """If the operation raised an exception, return the `Exception`
  221. object. Otherwise returns None.
  222. This method takes a ``timeout`` argument for compatibility with
  223. `concurrent.futures.Future` but it is an error to call it
  224. before the `Future` is done, so the ``timeout`` is never used.
  225. """
  226. self._clear_tb_log()
  227. if self._exc_info is not None:
  228. return self._exc_info[1]
  229. else:
  230. self._check_done()
  231. return None
  232. def add_done_callback(self, fn):
  233. """Attaches the given callback to the `Future`.
  234. It will be invoked with the `Future` as its argument when the Future
  235. has finished running and its result is available. In Tornado
  236. consider using `.IOLoop.add_future` instead of calling
  237. `add_done_callback` directly.
  238. """
  239. if self._done:
  240. from tornado.ioloop import IOLoop
  241. IOLoop.current().add_callback(fn, self)
  242. else:
  243. self._callbacks.append(fn)
  244. def set_result(self, result):
  245. """Sets the result of a ``Future``.
  246. It is undefined to call any of the ``set`` methods more than once
  247. on the same object.
  248. """
  249. self._result = result
  250. self._set_done()
  251. def set_exception(self, exception):
  252. """Sets the exception of a ``Future.``"""
  253. self.set_exc_info(
  254. (exception.__class__,
  255. exception,
  256. getattr(exception, '__traceback__', None)))
  257. def exc_info(self):
  258. """Returns a tuple in the same format as `sys.exc_info` or None.
  259. .. versionadded:: 4.0
  260. """
  261. self._clear_tb_log()
  262. return self._exc_info
  263. def set_exc_info(self, exc_info):
  264. """Sets the exception information of a ``Future.``
  265. Preserves tracebacks on Python 2.
  266. .. versionadded:: 4.0
  267. """
  268. self._exc_info = exc_info
  269. self._log_traceback = True
  270. if not _GC_CYCLE_FINALIZERS:
  271. self._tb_logger = _TracebackLogger(exc_info)
  272. try:
  273. self._set_done()
  274. finally:
  275. # Activate the logger after all callbacks have had a
  276. # chance to call result() or exception().
  277. if self._log_traceback and self._tb_logger is not None:
  278. self._tb_logger.activate()
  279. self._exc_info = exc_info
  280. def _check_done(self):
  281. if not self._done:
  282. raise Exception("DummyFuture does not support blocking for results")
  283. def _set_done(self):
  284. self._done = True
  285. if self._callbacks:
  286. from tornado.ioloop import IOLoop
  287. loop = IOLoop.current()
  288. for cb in self._callbacks:
  289. loop.add_callback(cb, self)
  290. self._callbacks = None
  291. # On Python 3.3 or older, objects with a destructor part of a reference
  292. # cycle are never destroyed. It's no longer the case on Python 3.4 thanks to
  293. # the PEP 442.
  294. if _GC_CYCLE_FINALIZERS:
  295. def __del__(self, is_finalizing=is_finalizing):
  296. if is_finalizing() or not self._log_traceback:
  297. # set_exception() was not called, or result() or exception()
  298. # has consumed the exception
  299. return
  300. tb = traceback.format_exception(*self._exc_info)
  301. app_log.error('Future %r exception was never retrieved: %s',
  302. self, ''.join(tb).rstrip())
  303. if asyncio is not None:
  304. Future = asyncio.Future # noqa
  305. if futures is None:
  306. FUTURES = Future # type: typing.Union[type, typing.Tuple[type, ...]]
  307. else:
  308. FUTURES = (futures.Future, Future)
  309. def is_future(x):
  310. return isinstance(x, FUTURES)
  311. class DummyExecutor(object):
  312. def submit(self, fn, *args, **kwargs):
  313. future = Future()
  314. try:
  315. future_set_result_unless_cancelled(future, fn(*args, **kwargs))
  316. except Exception:
  317. future_set_exc_info(future, sys.exc_info())
  318. return future
  319. def shutdown(self, wait=True):
  320. pass
  321. dummy_executor = DummyExecutor()
  322. def run_on_executor(*args, **kwargs):
  323. """Decorator to run a synchronous method asynchronously on an executor.
  324. The decorated method may be called with a ``callback`` keyword
  325. argument and returns a future.
  326. The executor to be used is determined by the ``executor``
  327. attributes of ``self``. To use a different attribute name, pass a
  328. keyword argument to the decorator::
  329. @run_on_executor(executor='_thread_pool')
  330. def foo(self):
  331. pass
  332. This decorator should not be confused with the similarly-named
  333. `.IOLoop.run_in_executor`. In general, using ``run_in_executor``
  334. when *calling* a blocking method is recommended instead of using
  335. this decorator when *defining* a method. If compatibility with older
  336. versions of Tornado is required, consider defining an executor
  337. and using ``executor.submit()`` at the call site.
  338. .. versionchanged:: 4.2
  339. Added keyword arguments to use alternative attributes.
  340. .. versionchanged:: 5.0
  341. Always uses the current IOLoop instead of ``self.io_loop``.
  342. .. versionchanged:: 5.1
  343. Returns a `.Future` compatible with ``await`` instead of a
  344. `concurrent.futures.Future`.
  345. .. deprecated:: 5.1
  346. The ``callback`` argument is deprecated and will be removed in
  347. 6.0. The decorator itself is discouraged in new code but will
  348. not be removed in 6.0.
  349. """
  350. def run_on_executor_decorator(fn):
  351. executor = kwargs.get("executor", "executor")
  352. @functools.wraps(fn)
  353. def wrapper(self, *args, **kwargs):
  354. callback = kwargs.pop("callback", None)
  355. async_future = Future()
  356. conc_future = getattr(self, executor).submit(fn, self, *args, **kwargs)
  357. chain_future(conc_future, async_future)
  358. if callback:
  359. warnings.warn("callback arguments are deprecated, use the returned Future instead",
  360. DeprecationWarning)
  361. from tornado.ioloop import IOLoop
  362. IOLoop.current().add_future(
  363. async_future, lambda future: callback(future.result()))
  364. return async_future
  365. return wrapper
  366. if args and kwargs:
  367. raise ValueError("cannot combine positional and keyword args")
  368. if len(args) == 1:
  369. return run_on_executor_decorator(args[0])
  370. elif len(args) != 0:
  371. raise ValueError("expected 1 argument, got %d", len(args))
  372. return run_on_executor_decorator
  373. _NO_RESULT = object()
  374. def return_future(f):
  375. """Decorator to make a function that returns via callback return a
  376. `Future`.
  377. This decorator was provided to ease the transition from
  378. callback-oriented code to coroutines. It is not recommended for
  379. new code.
  380. The wrapped function should take a ``callback`` keyword argument
  381. and invoke it with one argument when it has finished. To signal failure,
  382. the function can simply raise an exception (which will be
  383. captured by the `.StackContext` and passed along to the ``Future``).
  384. From the caller's perspective, the callback argument is optional.
  385. If one is given, it will be invoked when the function is complete
  386. with ``Future.result()`` as an argument. If the function fails, the
  387. callback will not be run and an exception will be raised into the
  388. surrounding `.StackContext`.
  389. If no callback is given, the caller should use the ``Future`` to
  390. wait for the function to complete (perhaps by yielding it in a
  391. coroutine, or passing it to `.IOLoop.add_future`).
  392. Usage:
  393. .. testcode::
  394. @return_future
  395. def future_func(arg1, arg2, callback):
  396. # Do stuff (possibly asynchronous)
  397. callback(result)
  398. async def caller():
  399. await future_func(arg1, arg2)
  400. ..
  401. Note that ``@return_future`` and ``@gen.engine`` can be applied to the
  402. same function, provided ``@return_future`` appears first. However,
  403. consider using ``@gen.coroutine`` instead of this combination.
  404. .. versionchanged:: 5.1
  405. Now raises a `.DeprecationWarning` if a callback argument is passed to
  406. the decorated function and deprecation warnings are enabled.
  407. .. deprecated:: 5.1
  408. This decorator will be removed in Tornado 6.0. New code should
  409. use coroutines directly instead of wrapping callback-based code
  410. with this decorator. Interactions with non-Tornado
  411. callback-based code should be managed explicitly to avoid
  412. relying on the `.ExceptionStackContext` built into this
  413. decorator.
  414. """
  415. warnings.warn("@return_future is deprecated, use coroutines instead",
  416. DeprecationWarning)
  417. return _non_deprecated_return_future(f, warn=True)
  418. def _non_deprecated_return_future(f, warn=False):
  419. # Allow auth.py to use this decorator without triggering
  420. # deprecation warnings. This will go away once auth.py has removed
  421. # its legacy interfaces in 6.0.
  422. replacer = ArgReplacer(f, 'callback')
  423. @functools.wraps(f)
  424. def wrapper(*args, **kwargs):
  425. future = Future()
  426. callback, args, kwargs = replacer.replace(
  427. lambda value=_NO_RESULT: future_set_result_unless_cancelled(future, value),
  428. args, kwargs)
  429. def handle_error(typ, value, tb):
  430. future_set_exc_info(future, (typ, value, tb))
  431. return True
  432. exc_info = None
  433. esc = ExceptionStackContext(handle_error, delay_warning=True)
  434. with esc:
  435. if not warn:
  436. # HACK: In non-deprecated mode (only used in auth.py),
  437. # suppress the warning entirely. Since this is added
  438. # in a 5.1 patch release and already removed in 6.0
  439. # I'm prioritizing a minimial change instead of a
  440. # clean solution.
  441. esc.delay_warning = False
  442. try:
  443. result = f(*args, **kwargs)
  444. if result is not None:
  445. raise ReturnValueIgnoredError(
  446. "@return_future should not be used with functions "
  447. "that return values")
  448. except:
  449. exc_info = sys.exc_info()
  450. raise
  451. if exc_info is not None:
  452. # If the initial synchronous part of f() raised an exception,
  453. # go ahead and raise it to the caller directly without waiting
  454. # for them to inspect the Future.
  455. future.result()
  456. # If the caller passed in a callback, schedule it to be called
  457. # when the future resolves. It is important that this happens
  458. # just before we return the future, or else we risk confusing
  459. # stack contexts with multiple exceptions (one here with the
  460. # immediate exception, and again when the future resolves and
  461. # the callback triggers its exception by calling future.result()).
  462. if callback is not None:
  463. warnings.warn("callback arguments are deprecated, use the returned Future instead",
  464. DeprecationWarning)
  465. def run_callback(future):
  466. result = future.result()
  467. if result is _NO_RESULT:
  468. callback()
  469. else:
  470. callback(future.result())
  471. future_add_done_callback(future, wrap(run_callback))
  472. return future
  473. return wrapper
  474. def chain_future(a, b):
  475. """Chain two futures together so that when one completes, so does the other.
  476. The result (success or failure) of ``a`` will be copied to ``b``, unless
  477. ``b`` has already been completed or cancelled by the time ``a`` finishes.
  478. .. versionchanged:: 5.0
  479. Now accepts both Tornado/asyncio `Future` objects and
  480. `concurrent.futures.Future`.
  481. """
  482. def copy(future):
  483. assert future is a
  484. if b.done():
  485. return
  486. if (hasattr(a, 'exc_info') and
  487. a.exc_info() is not None):
  488. future_set_exc_info(b, a.exc_info())
  489. elif a.exception() is not None:
  490. b.set_exception(a.exception())
  491. else:
  492. b.set_result(a.result())
  493. if isinstance(a, Future):
  494. future_add_done_callback(a, copy)
  495. else:
  496. # concurrent.futures.Future
  497. from tornado.ioloop import IOLoop
  498. IOLoop.current().add_future(a, copy)
  499. def future_set_result_unless_cancelled(future, value):
  500. """Set the given ``value`` as the `Future`'s result, if not cancelled.
  501. Avoids asyncio.InvalidStateError when calling set_result() on
  502. a cancelled `asyncio.Future`.
  503. .. versionadded:: 5.0
  504. """
  505. if not future.cancelled():
  506. future.set_result(value)
  507. def future_set_exc_info(future, exc_info):
  508. """Set the given ``exc_info`` as the `Future`'s exception.
  509. Understands both `asyncio.Future` and Tornado's extensions to
  510. enable better tracebacks on Python 2.
  511. .. versionadded:: 5.0
  512. """
  513. if hasattr(future, 'set_exc_info'):
  514. # Tornado's Future
  515. future.set_exc_info(exc_info)
  516. else:
  517. # asyncio.Future
  518. future.set_exception(exc_info[1])
  519. def future_add_done_callback(future, callback):
  520. """Arrange to call ``callback`` when ``future`` is complete.
  521. ``callback`` is invoked with one argument, the ``future``.
  522. If ``future`` is already done, ``callback`` is invoked immediately.
  523. This may differ from the behavior of ``Future.add_done_callback``,
  524. which makes no such guarantee.
  525. .. versionadded:: 5.0
  526. """
  527. if future.done():
  528. callback(future)
  529. else:
  530. future.add_done_callback(callback)