gen.py 48 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367
  1. """``tornado.gen`` implements generator-based coroutines.
  2. .. note::
  3. The "decorator and generator" approach in this module is a
  4. precursor to native coroutines (using ``async def`` and ``await``)
  5. which were introduced in Python 3.5. Applications that do not
  6. require compatibility with older versions of Python should use
  7. native coroutines instead. Some parts of this module are still
  8. useful with native coroutines, notably `multi`, `sleep`,
  9. `WaitIterator`, and `with_timeout`. Some of these functions have
  10. counterparts in the `asyncio` module which may be used as well,
  11. although the two may not necessarily be 100% compatible.
  12. Coroutines provide an easier way to work in an asynchronous
  13. environment than chaining callbacks. Code using coroutines is
  14. technically asynchronous, but it is written as a single generator
  15. instead of a collection of separate functions.
  16. For example, the following callback-based asynchronous handler:
  17. .. testcode::
  18. class AsyncHandler(RequestHandler):
  19. @asynchronous
  20. def get(self):
  21. http_client = AsyncHTTPClient()
  22. http_client.fetch("http://example.com",
  23. callback=self.on_fetch)
  24. def on_fetch(self, response):
  25. do_something_with_response(response)
  26. self.render("template.html")
  27. .. testoutput::
  28. :hide:
  29. could be written with ``gen`` as:
  30. .. testcode::
  31. class GenAsyncHandler(RequestHandler):
  32. @gen.coroutine
  33. def get(self):
  34. http_client = AsyncHTTPClient()
  35. response = yield http_client.fetch("http://example.com")
  36. do_something_with_response(response)
  37. self.render("template.html")
  38. .. testoutput::
  39. :hide:
  40. Most asynchronous functions in Tornado return a `.Future`;
  41. yielding this object returns its ``Future.result``.
  42. You can also yield a list or dict of ``Futures``, which will be
  43. started at the same time and run in parallel; a list or dict of results will
  44. be returned when they are all finished:
  45. .. testcode::
  46. @gen.coroutine
  47. def get(self):
  48. http_client = AsyncHTTPClient()
  49. response1, response2 = yield [http_client.fetch(url1),
  50. http_client.fetch(url2)]
  51. response_dict = yield dict(response3=http_client.fetch(url3),
  52. response4=http_client.fetch(url4))
  53. response3 = response_dict['response3']
  54. response4 = response_dict['response4']
  55. .. testoutput::
  56. :hide:
  57. If the `~functools.singledispatch` library is available (standard in
  58. Python 3.4, available via the `singledispatch
  59. <https://pypi.python.org/pypi/singledispatch>`_ package on older
  60. versions), additional types of objects may be yielded. Tornado includes
  61. support for ``asyncio.Future`` and Twisted's ``Deferred`` class when
  62. ``tornado.platform.asyncio`` and ``tornado.platform.twisted`` are imported.
  63. See the `convert_yielded` function to extend this mechanism.
  64. .. versionchanged:: 3.2
  65. Dict support added.
  66. .. versionchanged:: 4.1
  67. Support added for yielding ``asyncio`` Futures and Twisted Deferreds
  68. via ``singledispatch``.
  69. """
  70. from __future__ import absolute_import, division, print_function
  71. import collections
  72. import functools
  73. import itertools
  74. import os
  75. import sys
  76. import types
  77. import warnings
  78. from tornado.concurrent import (Future, is_future, chain_future, future_set_exc_info,
  79. future_add_done_callback, future_set_result_unless_cancelled)
  80. from tornado.ioloop import IOLoop
  81. from tornado.log import app_log
  82. from tornado import stack_context
  83. from tornado.util import PY3, raise_exc_info, TimeoutError
  84. try:
  85. try:
  86. # py34+
  87. from functools import singledispatch # type: ignore
  88. except ImportError:
  89. from singledispatch import singledispatch # backport
  90. except ImportError:
  91. # In most cases, singledispatch is required (to avoid
  92. # difficult-to-diagnose problems in which the functionality
  93. # available differs depending on which invisble packages are
  94. # installed). However, in Google App Engine third-party
  95. # dependencies are more trouble so we allow this module to be
  96. # imported without it.
  97. if 'APPENGINE_RUNTIME' not in os.environ:
  98. raise
  99. singledispatch = None
  100. try:
  101. try:
  102. # py35+
  103. from collections.abc import Generator as GeneratorType # type: ignore
  104. except ImportError:
  105. from backports_abc import Generator as GeneratorType # type: ignore
  106. try:
  107. # py35+
  108. from inspect import isawaitable # type: ignore
  109. except ImportError:
  110. from backports_abc import isawaitable
  111. except ImportError:
  112. if 'APPENGINE_RUNTIME' not in os.environ:
  113. raise
  114. from types import GeneratorType
  115. def isawaitable(x): # type: ignore
  116. return False
  117. if PY3:
  118. import builtins
  119. else:
  120. import __builtin__ as builtins
  121. class KeyReuseError(Exception):
  122. pass
  123. class UnknownKeyError(Exception):
  124. pass
  125. class LeakedCallbackError(Exception):
  126. pass
  127. class BadYieldError(Exception):
  128. pass
  129. class ReturnValueIgnoredError(Exception):
  130. pass
  131. def _value_from_stopiteration(e):
  132. try:
  133. # StopIteration has a value attribute beginning in py33.
  134. # So does our Return class.
  135. return e.value
  136. except AttributeError:
  137. pass
  138. try:
  139. # Cython backports coroutine functionality by putting the value in
  140. # e.args[0].
  141. return e.args[0]
  142. except (AttributeError, IndexError):
  143. return None
  144. def _create_future():
  145. future = Future()
  146. # Fixup asyncio debug info by removing extraneous stack entries
  147. source_traceback = getattr(future, "_source_traceback", ())
  148. while source_traceback:
  149. # Each traceback entry is equivalent to a
  150. # (filename, self.lineno, self.name, self.line) tuple
  151. filename = source_traceback[-1][0]
  152. if filename == __file__:
  153. del source_traceback[-1]
  154. else:
  155. break
  156. return future
  157. def engine(func):
  158. """Callback-oriented decorator for asynchronous generators.
  159. This is an older interface; for new code that does not need to be
  160. compatible with versions of Tornado older than 3.0 the
  161. `coroutine` decorator is recommended instead.
  162. This decorator is similar to `coroutine`, except it does not
  163. return a `.Future` and the ``callback`` argument is not treated
  164. specially.
  165. In most cases, functions decorated with `engine` should take
  166. a ``callback`` argument and invoke it with their result when
  167. they are finished. One notable exception is the
  168. `~tornado.web.RequestHandler` :ref:`HTTP verb methods <verbs>`,
  169. which use ``self.finish()`` in place of a callback argument.
  170. .. deprecated:: 5.1
  171. This decorator will be removed in 6.0. Use `coroutine` or
  172. ``async def`` instead.
  173. """
  174. warnings.warn("gen.engine is deprecated, use gen.coroutine or async def instead",
  175. DeprecationWarning)
  176. func = _make_coroutine_wrapper(func, replace_callback=False)
  177. @functools.wraps(func)
  178. def wrapper(*args, **kwargs):
  179. future = func(*args, **kwargs)
  180. def final_callback(future):
  181. if future.result() is not None:
  182. raise ReturnValueIgnoredError(
  183. "@gen.engine functions cannot return values: %r" %
  184. (future.result(),))
  185. # The engine interface doesn't give us any way to return
  186. # errors but to raise them into the stack context.
  187. # Save the stack context here to use when the Future has resolved.
  188. future_add_done_callback(future, stack_context.wrap(final_callback))
  189. return wrapper
  190. def coroutine(func):
  191. """Decorator for asynchronous generators.
  192. Any generator that yields objects from this module must be wrapped
  193. in either this decorator or `engine`.
  194. Coroutines may "return" by raising the special exception
  195. `Return(value) <Return>`. In Python 3.3+, it is also possible for
  196. the function to simply use the ``return value`` statement (prior to
  197. Python 3.3 generators were not allowed to also return values).
  198. In all versions of Python a coroutine that simply wishes to exit
  199. early may use the ``return`` statement without a value.
  200. Functions with this decorator return a `.Future`. Additionally,
  201. they may be called with a ``callback`` keyword argument, which
  202. will be invoked with the future's result when it resolves. If the
  203. coroutine fails, the callback will not be run and an exception
  204. will be raised into the surrounding `.StackContext`. The
  205. ``callback`` argument is not visible inside the decorated
  206. function; it is handled by the decorator itself.
  207. .. warning::
  208. When exceptions occur inside a coroutine, the exception
  209. information will be stored in the `.Future` object. You must
  210. examine the result of the `.Future` object, or the exception
  211. may go unnoticed by your code. This means yielding the function
  212. if called from another coroutine, using something like
  213. `.IOLoop.run_sync` for top-level calls, or passing the `.Future`
  214. to `.IOLoop.add_future`.
  215. .. deprecated:: 5.1
  216. The ``callback`` argument is deprecated and will be removed in 6.0.
  217. Use the returned awaitable object instead.
  218. """
  219. return _make_coroutine_wrapper(func, replace_callback=True)
  220. def _make_coroutine_wrapper(func, replace_callback):
  221. """The inner workings of ``@gen.coroutine`` and ``@gen.engine``.
  222. The two decorators differ in their treatment of the ``callback``
  223. argument, so we cannot simply implement ``@engine`` in terms of
  224. ``@coroutine``.
  225. """
  226. # On Python 3.5, set the coroutine flag on our generator, to allow it
  227. # to be used with 'await'.
  228. wrapped = func
  229. if hasattr(types, 'coroutine'):
  230. func = types.coroutine(func)
  231. @functools.wraps(wrapped)
  232. def wrapper(*args, **kwargs):
  233. future = _create_future()
  234. if replace_callback and 'callback' in kwargs:
  235. warnings.warn("callback arguments are deprecated, use the returned Future instead",
  236. DeprecationWarning, stacklevel=2)
  237. callback = kwargs.pop('callback')
  238. IOLoop.current().add_future(
  239. future, lambda future: callback(future.result()))
  240. try:
  241. result = func(*args, **kwargs)
  242. except (Return, StopIteration) as e:
  243. result = _value_from_stopiteration(e)
  244. except Exception:
  245. future_set_exc_info(future, sys.exc_info())
  246. try:
  247. return future
  248. finally:
  249. # Avoid circular references
  250. future = None
  251. else:
  252. if isinstance(result, GeneratorType):
  253. # Inline the first iteration of Runner.run. This lets us
  254. # avoid the cost of creating a Runner when the coroutine
  255. # never actually yields, which in turn allows us to
  256. # use "optional" coroutines in critical path code without
  257. # performance penalty for the synchronous case.
  258. try:
  259. orig_stack_contexts = stack_context._state.contexts
  260. yielded = next(result)
  261. if stack_context._state.contexts is not orig_stack_contexts:
  262. yielded = _create_future()
  263. yielded.set_exception(
  264. stack_context.StackContextInconsistentError(
  265. 'stack_context inconsistency (probably caused '
  266. 'by yield within a "with StackContext" block)'))
  267. except (StopIteration, Return) as e:
  268. future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
  269. except Exception:
  270. future_set_exc_info(future, sys.exc_info())
  271. else:
  272. # Provide strong references to Runner objects as long
  273. # as their result future objects also have strong
  274. # references (typically from the parent coroutine's
  275. # Runner). This keeps the coroutine's Runner alive.
  276. # We do this by exploiting the public API
  277. # add_done_callback() instead of putting a private
  278. # attribute on the Future.
  279. # (Github issues #1769, #2229).
  280. runner = Runner(result, future, yielded)
  281. future.add_done_callback(lambda _: runner)
  282. yielded = None
  283. try:
  284. return future
  285. finally:
  286. # Subtle memory optimization: if next() raised an exception,
  287. # the future's exc_info contains a traceback which
  288. # includes this stack frame. This creates a cycle,
  289. # which will be collected at the next full GC but has
  290. # been shown to greatly increase memory usage of
  291. # benchmarks (relative to the refcount-based scheme
  292. # used in the absence of cycles). We can avoid the
  293. # cycle by clearing the local variable after we return it.
  294. future = None
  295. future_set_result_unless_cancelled(future, result)
  296. return future
  297. wrapper.__wrapped__ = wrapped
  298. wrapper.__tornado_coroutine__ = True
  299. return wrapper
  300. def is_coroutine_function(func):
  301. """Return whether *func* is a coroutine function, i.e. a function
  302. wrapped with `~.gen.coroutine`.
  303. .. versionadded:: 4.5
  304. """
  305. return getattr(func, '__tornado_coroutine__', False)
  306. class Return(Exception):
  307. """Special exception to return a value from a `coroutine`.
  308. If this exception is raised, its value argument is used as the
  309. result of the coroutine::
  310. @gen.coroutine
  311. def fetch_json(url):
  312. response = yield AsyncHTTPClient().fetch(url)
  313. raise gen.Return(json_decode(response.body))
  314. In Python 3.3, this exception is no longer necessary: the ``return``
  315. statement can be used directly to return a value (previously
  316. ``yield`` and ``return`` with a value could not be combined in the
  317. same function).
  318. By analogy with the return statement, the value argument is optional,
  319. but it is never necessary to ``raise gen.Return()``. The ``return``
  320. statement can be used with no arguments instead.
  321. """
  322. def __init__(self, value=None):
  323. super(Return, self).__init__()
  324. self.value = value
  325. # Cython recognizes subclasses of StopIteration with a .args tuple.
  326. self.args = (value,)
  327. class WaitIterator(object):
  328. """Provides an iterator to yield the results of futures as they finish.
  329. Yielding a set of futures like this:
  330. ``results = yield [future1, future2]``
  331. pauses the coroutine until both ``future1`` and ``future2``
  332. return, and then restarts the coroutine with the results of both
  333. futures. If either future is an exception, the expression will
  334. raise that exception and all the results will be lost.
  335. If you need to get the result of each future as soon as possible,
  336. or if you need the result of some futures even if others produce
  337. errors, you can use ``WaitIterator``::
  338. wait_iterator = gen.WaitIterator(future1, future2)
  339. while not wait_iterator.done():
  340. try:
  341. result = yield wait_iterator.next()
  342. except Exception as e:
  343. print("Error {} from {}".format(e, wait_iterator.current_future))
  344. else:
  345. print("Result {} received from {} at {}".format(
  346. result, wait_iterator.current_future,
  347. wait_iterator.current_index))
  348. Because results are returned as soon as they are available the
  349. output from the iterator *will not be in the same order as the
  350. input arguments*. If you need to know which future produced the
  351. current result, you can use the attributes
  352. ``WaitIterator.current_future``, or ``WaitIterator.current_index``
  353. to get the index of the future from the input list. (if keyword
  354. arguments were used in the construction of the `WaitIterator`,
  355. ``current_index`` will use the corresponding keyword).
  356. On Python 3.5, `WaitIterator` implements the async iterator
  357. protocol, so it can be used with the ``async for`` statement (note
  358. that in this version the entire iteration is aborted if any value
  359. raises an exception, while the previous example can continue past
  360. individual errors)::
  361. async for result in gen.WaitIterator(future1, future2):
  362. print("Result {} received from {} at {}".format(
  363. result, wait_iterator.current_future,
  364. wait_iterator.current_index))
  365. .. versionadded:: 4.1
  366. .. versionchanged:: 4.3
  367. Added ``async for`` support in Python 3.5.
  368. """
  369. def __init__(self, *args, **kwargs):
  370. if args and kwargs:
  371. raise ValueError(
  372. "You must provide args or kwargs, not both")
  373. if kwargs:
  374. self._unfinished = dict((f, k) for (k, f) in kwargs.items())
  375. futures = list(kwargs.values())
  376. else:
  377. self._unfinished = dict((f, i) for (i, f) in enumerate(args))
  378. futures = args
  379. self._finished = collections.deque()
  380. self.current_index = self.current_future = None
  381. self._running_future = None
  382. for future in futures:
  383. future_add_done_callback(future, self._done_callback)
  384. def done(self):
  385. """Returns True if this iterator has no more results."""
  386. if self._finished or self._unfinished:
  387. return False
  388. # Clear the 'current' values when iteration is done.
  389. self.current_index = self.current_future = None
  390. return True
  391. def next(self):
  392. """Returns a `.Future` that will yield the next available result.
  393. Note that this `.Future` will not be the same object as any of
  394. the inputs.
  395. """
  396. self._running_future = Future()
  397. if self._finished:
  398. self._return_result(self._finished.popleft())
  399. return self._running_future
  400. def _done_callback(self, done):
  401. if self._running_future and not self._running_future.done():
  402. self._return_result(done)
  403. else:
  404. self._finished.append(done)
  405. def _return_result(self, done):
  406. """Called set the returned future's state that of the future
  407. we yielded, and set the current future for the iterator.
  408. """
  409. chain_future(done, self._running_future)
  410. self.current_future = done
  411. self.current_index = self._unfinished.pop(done)
  412. def __aiter__(self):
  413. return self
  414. def __anext__(self):
  415. if self.done():
  416. # Lookup by name to silence pyflakes on older versions.
  417. raise getattr(builtins, 'StopAsyncIteration')()
  418. return self.next()
  419. class YieldPoint(object):
  420. """Base class for objects that may be yielded from the generator.
  421. .. deprecated:: 4.0
  422. Use `Futures <.Future>` instead. This class and all its subclasses
  423. will be removed in 6.0
  424. """
  425. def __init__(self):
  426. warnings.warn("YieldPoint is deprecated, use Futures instead",
  427. DeprecationWarning)
  428. def start(self, runner):
  429. """Called by the runner after the generator has yielded.
  430. No other methods will be called on this object before ``start``.
  431. """
  432. raise NotImplementedError()
  433. def is_ready(self):
  434. """Called by the runner to determine whether to resume the generator.
  435. Returns a boolean; may be called more than once.
  436. """
  437. raise NotImplementedError()
  438. def get_result(self):
  439. """Returns the value to use as the result of the yield expression.
  440. This method will only be called once, and only after `is_ready`
  441. has returned true.
  442. """
  443. raise NotImplementedError()
  444. class Callback(YieldPoint):
  445. """Returns a callable object that will allow a matching `Wait` to proceed.
  446. The key may be any value suitable for use as a dictionary key, and is
  447. used to match ``Callbacks`` to their corresponding ``Waits``. The key
  448. must be unique among outstanding callbacks within a single run of the
  449. generator function, but may be reused across different runs of the same
  450. function (so constants generally work fine).
  451. The callback may be called with zero or one arguments; if an argument
  452. is given it will be returned by `Wait`.
  453. .. deprecated:: 4.0
  454. Use `Futures <.Future>` instead. This class will be removed in 6.0.
  455. """
  456. def __init__(self, key):
  457. warnings.warn("gen.Callback is deprecated, use Futures instead",
  458. DeprecationWarning)
  459. self.key = key
  460. def start(self, runner):
  461. self.runner = runner
  462. runner.register_callback(self.key)
  463. def is_ready(self):
  464. return True
  465. def get_result(self):
  466. return self.runner.result_callback(self.key)
  467. class Wait(YieldPoint):
  468. """Returns the argument passed to the result of a previous `Callback`.
  469. .. deprecated:: 4.0
  470. Use `Futures <.Future>` instead. This class will be removed in 6.0.
  471. """
  472. def __init__(self, key):
  473. warnings.warn("gen.Wait is deprecated, use Futures instead",
  474. DeprecationWarning)
  475. self.key = key
  476. def start(self, runner):
  477. self.runner = runner
  478. def is_ready(self):
  479. return self.runner.is_ready(self.key)
  480. def get_result(self):
  481. return self.runner.pop_result(self.key)
  482. class WaitAll(YieldPoint):
  483. """Returns the results of multiple previous `Callbacks <Callback>`.
  484. The argument is a sequence of `Callback` keys, and the result is
  485. a list of results in the same order.
  486. `WaitAll` is equivalent to yielding a list of `Wait` objects.
  487. .. deprecated:: 4.0
  488. Use `Futures <.Future>` instead. This class will be removed in 6.0.
  489. """
  490. def __init__(self, keys):
  491. warnings.warn("gen.WaitAll is deprecated, use gen.multi instead",
  492. DeprecationWarning)
  493. self.keys = keys
  494. def start(self, runner):
  495. self.runner = runner
  496. def is_ready(self):
  497. return all(self.runner.is_ready(key) for key in self.keys)
  498. def get_result(self):
  499. return [self.runner.pop_result(key) for key in self.keys]
  500. def Task(func, *args, **kwargs):
  501. """Adapts a callback-based asynchronous function for use in coroutines.
  502. Takes a function (and optional additional arguments) and runs it with
  503. those arguments plus a ``callback`` keyword argument. The argument passed
  504. to the callback is returned as the result of the yield expression.
  505. .. versionchanged:: 4.0
  506. ``gen.Task`` is now a function that returns a `.Future`, instead of
  507. a subclass of `YieldPoint`. It still behaves the same way when
  508. yielded.
  509. .. deprecated:: 5.1
  510. This function is deprecated and will be removed in 6.0.
  511. """
  512. warnings.warn("gen.Task is deprecated, use Futures instead",
  513. DeprecationWarning)
  514. future = _create_future()
  515. def handle_exception(typ, value, tb):
  516. if future.done():
  517. return False
  518. future_set_exc_info(future, (typ, value, tb))
  519. return True
  520. def set_result(result):
  521. if future.done():
  522. return
  523. future_set_result_unless_cancelled(future, result)
  524. with stack_context.ExceptionStackContext(handle_exception):
  525. func(*args, callback=_argument_adapter(set_result), **kwargs)
  526. return future
  527. class YieldFuture(YieldPoint):
  528. def __init__(self, future):
  529. """Adapts a `.Future` to the `YieldPoint` interface.
  530. .. versionchanged:: 5.0
  531. The ``io_loop`` argument (deprecated since version 4.1) has been removed.
  532. .. deprecated:: 5.1
  533. This class will be removed in 6.0.
  534. """
  535. warnings.warn("YieldFuture is deprecated, use Futures instead",
  536. DeprecationWarning)
  537. self.future = future
  538. self.io_loop = IOLoop.current()
  539. def start(self, runner):
  540. if not self.future.done():
  541. self.runner = runner
  542. self.key = object()
  543. runner.register_callback(self.key)
  544. self.io_loop.add_future(self.future, runner.result_callback(self.key))
  545. else:
  546. self.runner = None
  547. self.result_fn = self.future.result
  548. def is_ready(self):
  549. if self.runner is not None:
  550. return self.runner.is_ready(self.key)
  551. else:
  552. return True
  553. def get_result(self):
  554. if self.runner is not None:
  555. return self.runner.pop_result(self.key).result()
  556. else:
  557. return self.result_fn()
  558. def _contains_yieldpoint(children):
  559. """Returns True if ``children`` contains any YieldPoints.
  560. ``children`` may be a dict or a list, as used by `MultiYieldPoint`
  561. and `multi_future`.
  562. """
  563. if isinstance(children, dict):
  564. return any(isinstance(i, YieldPoint) for i in children.values())
  565. if isinstance(children, list):
  566. return any(isinstance(i, YieldPoint) for i in children)
  567. return False
  568. def multi(children, quiet_exceptions=()):
  569. """Runs multiple asynchronous operations in parallel.
  570. ``children`` may either be a list or a dict whose values are
  571. yieldable objects. ``multi()`` returns a new yieldable
  572. object that resolves to a parallel structure containing their
  573. results. If ``children`` is a list, the result is a list of
  574. results in the same order; if it is a dict, the result is a dict
  575. with the same keys.
  576. That is, ``results = yield multi(list_of_futures)`` is equivalent
  577. to::
  578. results = []
  579. for future in list_of_futures:
  580. results.append(yield future)
  581. If any children raise exceptions, ``multi()`` will raise the first
  582. one. All others will be logged, unless they are of types
  583. contained in the ``quiet_exceptions`` argument.
  584. If any of the inputs are `YieldPoints <YieldPoint>`, the returned
  585. yieldable object is a `YieldPoint`. Otherwise, returns a `.Future`.
  586. This means that the result of `multi` can be used in a native
  587. coroutine if and only if all of its children can be.
  588. In a ``yield``-based coroutine, it is not normally necessary to
  589. call this function directly, since the coroutine runner will
  590. do it automatically when a list or dict is yielded. However,
  591. it is necessary in ``await``-based coroutines, or to pass
  592. the ``quiet_exceptions`` argument.
  593. This function is available under the names ``multi()`` and ``Multi()``
  594. for historical reasons.
  595. Cancelling a `.Future` returned by ``multi()`` does not cancel its
  596. children. `asyncio.gather` is similar to ``multi()``, but it does
  597. cancel its children.
  598. .. versionchanged:: 4.2
  599. If multiple yieldables fail, any exceptions after the first
  600. (which is raised) will be logged. Added the ``quiet_exceptions``
  601. argument to suppress this logging for selected exception types.
  602. .. versionchanged:: 4.3
  603. Replaced the class ``Multi`` and the function ``multi_future``
  604. with a unified function ``multi``. Added support for yieldables
  605. other than `YieldPoint` and `.Future`.
  606. """
  607. if _contains_yieldpoint(children):
  608. return MultiYieldPoint(children, quiet_exceptions=quiet_exceptions)
  609. else:
  610. return multi_future(children, quiet_exceptions=quiet_exceptions)
  611. Multi = multi
  612. class MultiYieldPoint(YieldPoint):
  613. """Runs multiple asynchronous operations in parallel.
  614. This class is similar to `multi`, but it always creates a stack
  615. context even when no children require it. It is not compatible with
  616. native coroutines.
  617. .. versionchanged:: 4.2
  618. If multiple ``YieldPoints`` fail, any exceptions after the first
  619. (which is raised) will be logged. Added the ``quiet_exceptions``
  620. argument to suppress this logging for selected exception types.
  621. .. versionchanged:: 4.3
  622. Renamed from ``Multi`` to ``MultiYieldPoint``. The name ``Multi``
  623. remains as an alias for the equivalent `multi` function.
  624. .. deprecated:: 4.3
  625. Use `multi` instead. This class will be removed in 6.0.
  626. """
  627. def __init__(self, children, quiet_exceptions=()):
  628. warnings.warn("MultiYieldPoint is deprecated, use Futures instead",
  629. DeprecationWarning)
  630. self.keys = None
  631. if isinstance(children, dict):
  632. self.keys = list(children.keys())
  633. children = children.values()
  634. self.children = []
  635. for i in children:
  636. if not isinstance(i, YieldPoint):
  637. i = convert_yielded(i)
  638. if is_future(i):
  639. i = YieldFuture(i)
  640. self.children.append(i)
  641. assert all(isinstance(i, YieldPoint) for i in self.children)
  642. self.unfinished_children = set(self.children)
  643. self.quiet_exceptions = quiet_exceptions
  644. def start(self, runner):
  645. for i in self.children:
  646. i.start(runner)
  647. def is_ready(self):
  648. finished = list(itertools.takewhile(
  649. lambda i: i.is_ready(), self.unfinished_children))
  650. self.unfinished_children.difference_update(finished)
  651. return not self.unfinished_children
  652. def get_result(self):
  653. result_list = []
  654. exc_info = None
  655. for f in self.children:
  656. try:
  657. result_list.append(f.get_result())
  658. except Exception as e:
  659. if exc_info is None:
  660. exc_info = sys.exc_info()
  661. else:
  662. if not isinstance(e, self.quiet_exceptions):
  663. app_log.error("Multiple exceptions in yield list",
  664. exc_info=True)
  665. if exc_info is not None:
  666. raise_exc_info(exc_info)
  667. if self.keys is not None:
  668. return dict(zip(self.keys, result_list))
  669. else:
  670. return list(result_list)
  671. def multi_future(children, quiet_exceptions=()):
  672. """Wait for multiple asynchronous futures in parallel.
  673. This function is similar to `multi`, but does not support
  674. `YieldPoints <YieldPoint>`.
  675. .. versionadded:: 4.0
  676. .. versionchanged:: 4.2
  677. If multiple ``Futures`` fail, any exceptions after the first (which is
  678. raised) will be logged. Added the ``quiet_exceptions``
  679. argument to suppress this logging for selected exception types.
  680. .. deprecated:: 4.3
  681. Use `multi` instead.
  682. """
  683. if isinstance(children, dict):
  684. keys = list(children.keys())
  685. children = children.values()
  686. else:
  687. keys = None
  688. children = list(map(convert_yielded, children))
  689. assert all(is_future(i) or isinstance(i, _NullFuture) for i in children)
  690. unfinished_children = set(children)
  691. future = _create_future()
  692. if not children:
  693. future_set_result_unless_cancelled(future,
  694. {} if keys is not None else [])
  695. def callback(f):
  696. unfinished_children.remove(f)
  697. if not unfinished_children:
  698. result_list = []
  699. for f in children:
  700. try:
  701. result_list.append(f.result())
  702. except Exception as e:
  703. if future.done():
  704. if not isinstance(e, quiet_exceptions):
  705. app_log.error("Multiple exceptions in yield list",
  706. exc_info=True)
  707. else:
  708. future_set_exc_info(future, sys.exc_info())
  709. if not future.done():
  710. if keys is not None:
  711. future_set_result_unless_cancelled(future,
  712. dict(zip(keys, result_list)))
  713. else:
  714. future_set_result_unless_cancelled(future, result_list)
  715. listening = set()
  716. for f in children:
  717. if f not in listening:
  718. listening.add(f)
  719. future_add_done_callback(f, callback)
  720. return future
  721. def maybe_future(x):
  722. """Converts ``x`` into a `.Future`.
  723. If ``x`` is already a `.Future`, it is simply returned; otherwise
  724. it is wrapped in a new `.Future`. This is suitable for use as
  725. ``result = yield gen.maybe_future(f())`` when you don't know whether
  726. ``f()`` returns a `.Future` or not.
  727. .. deprecated:: 4.3
  728. This function only handles ``Futures``, not other yieldable objects.
  729. Instead of `maybe_future`, check for the non-future result types
  730. you expect (often just ``None``), and ``yield`` anything unknown.
  731. """
  732. if is_future(x):
  733. return x
  734. else:
  735. fut = _create_future()
  736. fut.set_result(x)
  737. return fut
  738. def with_timeout(timeout, future, quiet_exceptions=()):
  739. """Wraps a `.Future` (or other yieldable object) in a timeout.
  740. Raises `tornado.util.TimeoutError` if the input future does not
  741. complete before ``timeout``, which may be specified in any form
  742. allowed by `.IOLoop.add_timeout` (i.e. a `datetime.timedelta` or
  743. an absolute time relative to `.IOLoop.time`)
  744. If the wrapped `.Future` fails after it has timed out, the exception
  745. will be logged unless it is of a type contained in ``quiet_exceptions``
  746. (which may be an exception type or a sequence of types).
  747. Does not support `YieldPoint` subclasses.
  748. The wrapped `.Future` is not canceled when the timeout expires,
  749. permitting it to be reused. `asyncio.wait_for` is similar to this
  750. function but it does cancel the wrapped `.Future` on timeout.
  751. .. versionadded:: 4.0
  752. .. versionchanged:: 4.1
  753. Added the ``quiet_exceptions`` argument and the logging of unhandled
  754. exceptions.
  755. .. versionchanged:: 4.4
  756. Added support for yieldable objects other than `.Future`.
  757. """
  758. # TODO: allow YieldPoints in addition to other yieldables?
  759. # Tricky to do with stack_context semantics.
  760. #
  761. # It's tempting to optimize this by cancelling the input future on timeout
  762. # instead of creating a new one, but A) we can't know if we are the only
  763. # one waiting on the input future, so cancelling it might disrupt other
  764. # callers and B) concurrent futures can only be cancelled while they are
  765. # in the queue, so cancellation cannot reliably bound our waiting time.
  766. future = convert_yielded(future)
  767. result = _create_future()
  768. chain_future(future, result)
  769. io_loop = IOLoop.current()
  770. def error_callback(future):
  771. try:
  772. future.result()
  773. except Exception as e:
  774. if not isinstance(e, quiet_exceptions):
  775. app_log.error("Exception in Future %r after timeout",
  776. future, exc_info=True)
  777. def timeout_callback():
  778. if not result.done():
  779. result.set_exception(TimeoutError("Timeout"))
  780. # In case the wrapped future goes on to fail, log it.
  781. future_add_done_callback(future, error_callback)
  782. timeout_handle = io_loop.add_timeout(
  783. timeout, timeout_callback)
  784. if isinstance(future, Future):
  785. # We know this future will resolve on the IOLoop, so we don't
  786. # need the extra thread-safety of IOLoop.add_future (and we also
  787. # don't care about StackContext here.
  788. future_add_done_callback(
  789. future, lambda future: io_loop.remove_timeout(timeout_handle))
  790. else:
  791. # concurrent.futures.Futures may resolve on any thread, so we
  792. # need to route them back to the IOLoop.
  793. io_loop.add_future(
  794. future, lambda future: io_loop.remove_timeout(timeout_handle))
  795. return result
  796. def sleep(duration):
  797. """Return a `.Future` that resolves after the given number of seconds.
  798. When used with ``yield`` in a coroutine, this is a non-blocking
  799. analogue to `time.sleep` (which should not be used in coroutines
  800. because it is blocking)::
  801. yield gen.sleep(0.5)
  802. Note that calling this function on its own does nothing; you must
  803. wait on the `.Future` it returns (usually by yielding it).
  804. .. versionadded:: 4.1
  805. """
  806. f = _create_future()
  807. IOLoop.current().call_later(duration,
  808. lambda: future_set_result_unless_cancelled(f, None))
  809. return f
  810. class _NullFuture(object):
  811. """_NullFuture resembles a Future that finished with a result of None.
  812. It's not actually a `Future` to avoid depending on a particular event loop.
  813. Handled as a special case in the coroutine runner.
  814. """
  815. def result(self):
  816. return None
  817. def done(self):
  818. return True
  819. # _null_future is used as a dummy value in the coroutine runner. It differs
  820. # from moment in that moment always adds a delay of one IOLoop iteration
  821. # while _null_future is processed as soon as possible.
  822. _null_future = _NullFuture()
  823. moment = _NullFuture()
  824. moment.__doc__ = \
  825. """A special object which may be yielded to allow the IOLoop to run for
  826. one iteration.
  827. This is not needed in normal use but it can be helpful in long-running
  828. coroutines that are likely to yield Futures that are ready instantly.
  829. Usage: ``yield gen.moment``
  830. .. versionadded:: 4.0
  831. .. deprecated:: 4.5
  832. ``yield None`` (or ``yield`` with no argument) is now equivalent to
  833. ``yield gen.moment``.
  834. """
  835. class Runner(object):
  836. """Internal implementation of `tornado.gen.engine`.
  837. Maintains information about pending callbacks and their results.
  838. The results of the generator are stored in ``result_future`` (a
  839. `.Future`)
  840. """
  841. def __init__(self, gen, result_future, first_yielded):
  842. self.gen = gen
  843. self.result_future = result_future
  844. self.future = _null_future
  845. self.yield_point = None
  846. self.pending_callbacks = None
  847. self.results = None
  848. self.running = False
  849. self.finished = False
  850. self.had_exception = False
  851. self.io_loop = IOLoop.current()
  852. # For efficiency, we do not create a stack context until we
  853. # reach a YieldPoint (stack contexts are required for the historical
  854. # semantics of YieldPoints, but not for Futures). When we have
  855. # done so, this field will be set and must be called at the end
  856. # of the coroutine.
  857. self.stack_context_deactivate = None
  858. if self.handle_yield(first_yielded):
  859. gen = result_future = first_yielded = None
  860. self.run()
  861. def register_callback(self, key):
  862. """Adds ``key`` to the list of callbacks."""
  863. if self.pending_callbacks is None:
  864. # Lazily initialize the old-style YieldPoint data structures.
  865. self.pending_callbacks = set()
  866. self.results = {}
  867. if key in self.pending_callbacks:
  868. raise KeyReuseError("key %r is already pending" % (key,))
  869. self.pending_callbacks.add(key)
  870. def is_ready(self, key):
  871. """Returns true if a result is available for ``key``."""
  872. if self.pending_callbacks is None or key not in self.pending_callbacks:
  873. raise UnknownKeyError("key %r is not pending" % (key,))
  874. return key in self.results
  875. def set_result(self, key, result):
  876. """Sets the result for ``key`` and attempts to resume the generator."""
  877. self.results[key] = result
  878. if self.yield_point is not None and self.yield_point.is_ready():
  879. try:
  880. future_set_result_unless_cancelled(self.future,
  881. self.yield_point.get_result())
  882. except:
  883. future_set_exc_info(self.future, sys.exc_info())
  884. self.yield_point = None
  885. self.run()
  886. def pop_result(self, key):
  887. """Returns the result for ``key`` and unregisters it."""
  888. self.pending_callbacks.remove(key)
  889. return self.results.pop(key)
  890. def run(self):
  891. """Starts or resumes the generator, running until it reaches a
  892. yield point that is not ready.
  893. """
  894. if self.running or self.finished:
  895. return
  896. try:
  897. self.running = True
  898. while True:
  899. future = self.future
  900. if not future.done():
  901. return
  902. self.future = None
  903. try:
  904. orig_stack_contexts = stack_context._state.contexts
  905. exc_info = None
  906. try:
  907. value = future.result()
  908. except Exception:
  909. self.had_exception = True
  910. exc_info = sys.exc_info()
  911. future = None
  912. if exc_info is not None:
  913. try:
  914. yielded = self.gen.throw(*exc_info)
  915. finally:
  916. # Break up a reference to itself
  917. # for faster GC on CPython.
  918. exc_info = None
  919. else:
  920. yielded = self.gen.send(value)
  921. if stack_context._state.contexts is not orig_stack_contexts:
  922. self.gen.throw(
  923. stack_context.StackContextInconsistentError(
  924. 'stack_context inconsistency (probably caused '
  925. 'by yield within a "with StackContext" block)'))
  926. except (StopIteration, Return) as e:
  927. self.finished = True
  928. self.future = _null_future
  929. if self.pending_callbacks and not self.had_exception:
  930. # If we ran cleanly without waiting on all callbacks
  931. # raise an error (really more of a warning). If we
  932. # had an exception then some callbacks may have been
  933. # orphaned, so skip the check in that case.
  934. raise LeakedCallbackError(
  935. "finished without waiting for callbacks %r" %
  936. self.pending_callbacks)
  937. future_set_result_unless_cancelled(self.result_future,
  938. _value_from_stopiteration(e))
  939. self.result_future = None
  940. self._deactivate_stack_context()
  941. return
  942. except Exception:
  943. self.finished = True
  944. self.future = _null_future
  945. future_set_exc_info(self.result_future, sys.exc_info())
  946. self.result_future = None
  947. self._deactivate_stack_context()
  948. return
  949. if not self.handle_yield(yielded):
  950. return
  951. yielded = None
  952. finally:
  953. self.running = False
  954. def handle_yield(self, yielded):
  955. # Lists containing YieldPoints require stack contexts;
  956. # other lists are handled in convert_yielded.
  957. if _contains_yieldpoint(yielded):
  958. yielded = multi(yielded)
  959. if isinstance(yielded, YieldPoint):
  960. # YieldPoints are too closely coupled to the Runner to go
  961. # through the generic convert_yielded mechanism.
  962. self.future = Future()
  963. def start_yield_point():
  964. try:
  965. yielded.start(self)
  966. if yielded.is_ready():
  967. future_set_result_unless_cancelled(self.future, yielded.get_result())
  968. else:
  969. self.yield_point = yielded
  970. except Exception:
  971. self.future = Future()
  972. future_set_exc_info(self.future, sys.exc_info())
  973. if self.stack_context_deactivate is None:
  974. # Start a stack context if this is the first
  975. # YieldPoint we've seen.
  976. with stack_context.ExceptionStackContext(
  977. self.handle_exception) as deactivate:
  978. self.stack_context_deactivate = deactivate
  979. def cb():
  980. start_yield_point()
  981. self.run()
  982. self.io_loop.add_callback(cb)
  983. return False
  984. else:
  985. start_yield_point()
  986. else:
  987. try:
  988. self.future = convert_yielded(yielded)
  989. except BadYieldError:
  990. self.future = Future()
  991. future_set_exc_info(self.future, sys.exc_info())
  992. if self.future is moment:
  993. self.io_loop.add_callback(self.run)
  994. return False
  995. elif not self.future.done():
  996. def inner(f):
  997. # Break a reference cycle to speed GC.
  998. f = None # noqa
  999. self.run()
  1000. self.io_loop.add_future(
  1001. self.future, inner)
  1002. return False
  1003. return True
  1004. def result_callback(self, key):
  1005. return stack_context.wrap(_argument_adapter(
  1006. functools.partial(self.set_result, key)))
  1007. def handle_exception(self, typ, value, tb):
  1008. if not self.running and not self.finished:
  1009. self.future = Future()
  1010. future_set_exc_info(self.future, (typ, value, tb))
  1011. self.run()
  1012. return True
  1013. else:
  1014. return False
  1015. def _deactivate_stack_context(self):
  1016. if self.stack_context_deactivate is not None:
  1017. self.stack_context_deactivate()
  1018. self.stack_context_deactivate = None
  1019. Arguments = collections.namedtuple('Arguments', ['args', 'kwargs'])
  1020. def _argument_adapter(callback):
  1021. """Returns a function that when invoked runs ``callback`` with one arg.
  1022. If the function returned by this function is called with exactly
  1023. one argument, that argument is passed to ``callback``. Otherwise
  1024. the args tuple and kwargs dict are wrapped in an `Arguments` object.
  1025. """
  1026. def wrapper(*args, **kwargs):
  1027. if kwargs or len(args) > 1:
  1028. callback(Arguments(args, kwargs))
  1029. elif args:
  1030. callback(args[0])
  1031. else:
  1032. callback(None)
  1033. return wrapper
  1034. # Convert Awaitables into Futures.
  1035. try:
  1036. import asyncio
  1037. except ImportError:
  1038. # Py2-compatible version for use with Cython.
  1039. # Copied from PEP 380.
  1040. @coroutine
  1041. def _wrap_awaitable(x):
  1042. if hasattr(x, '__await__'):
  1043. _i = x.__await__()
  1044. else:
  1045. _i = iter(x)
  1046. try:
  1047. _y = next(_i)
  1048. except StopIteration as _e:
  1049. _r = _value_from_stopiteration(_e)
  1050. else:
  1051. while 1:
  1052. try:
  1053. _s = yield _y
  1054. except GeneratorExit as _e:
  1055. try:
  1056. _m = _i.close
  1057. except AttributeError:
  1058. pass
  1059. else:
  1060. _m()
  1061. raise _e
  1062. except BaseException as _e:
  1063. _x = sys.exc_info()
  1064. try:
  1065. _m = _i.throw
  1066. except AttributeError:
  1067. raise _e
  1068. else:
  1069. try:
  1070. _y = _m(*_x)
  1071. except StopIteration as _e:
  1072. _r = _value_from_stopiteration(_e)
  1073. break
  1074. else:
  1075. try:
  1076. if _s is None:
  1077. _y = next(_i)
  1078. else:
  1079. _y = _i.send(_s)
  1080. except StopIteration as _e:
  1081. _r = _value_from_stopiteration(_e)
  1082. break
  1083. raise Return(_r)
  1084. else:
  1085. try:
  1086. _wrap_awaitable = asyncio.ensure_future
  1087. except AttributeError:
  1088. # asyncio.ensure_future was introduced in Python 3.4.4, but
  1089. # Debian jessie still ships with 3.4.2 so try the old name.
  1090. _wrap_awaitable = getattr(asyncio, 'async')
  1091. def convert_yielded(yielded):
  1092. """Convert a yielded object into a `.Future`.
  1093. The default implementation accepts lists, dictionaries, and Futures.
  1094. If the `~functools.singledispatch` library is available, this function
  1095. may be extended to support additional types. For example::
  1096. @convert_yielded.register(asyncio.Future)
  1097. def _(asyncio_future):
  1098. return tornado.platform.asyncio.to_tornado_future(asyncio_future)
  1099. .. versionadded:: 4.1
  1100. """
  1101. # Lists and dicts containing YieldPoints were handled earlier.
  1102. if yielded is None or yielded is moment:
  1103. return moment
  1104. elif yielded is _null_future:
  1105. return _null_future
  1106. elif isinstance(yielded, (list, dict)):
  1107. return multi(yielded)
  1108. elif is_future(yielded):
  1109. return yielded
  1110. elif isawaitable(yielded):
  1111. return _wrap_awaitable(yielded)
  1112. else:
  1113. raise BadYieldError("yielded unknown object %r" % (yielded,))
  1114. if singledispatch is not None:
  1115. convert_yielded = singledispatch(convert_yielded)