12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862 |
- from __future__ import absolute_import, division, print_function
- import gc
- import contextlib
- import datetime
- import functools
- import platform
- import sys
- import textwrap
- import time
- import weakref
- import warnings
- from tornado.concurrent import return_future, Future
- from tornado.escape import url_escape
- from tornado.httpclient import AsyncHTTPClient
- from tornado.ioloop import IOLoop
- from tornado.log import app_log
- from tornado import stack_context
- from tornado.testing import AsyncHTTPTestCase, AsyncTestCase, ExpectLog, gen_test
- from tornado.test.util import unittest, skipOnTravis, skipBefore33, skipBefore35, skipNotCPython, exec_test, ignore_deprecation # noqa: E501
- from tornado.web import Application, RequestHandler, asynchronous, HTTPError
- from tornado import gen
- try:
- from concurrent import futures
- except ImportError:
- futures = None
- try:
- import asyncio
- except ImportError:
- asyncio = None
- class GenEngineTest(AsyncTestCase):
- def setUp(self):
- self.warning_catcher = warnings.catch_warnings()
- self.warning_catcher.__enter__()
- warnings.simplefilter('ignore', DeprecationWarning)
- super(GenEngineTest, self).setUp()
- self.named_contexts = []
- def tearDown(self):
- super(GenEngineTest, self).tearDown()
- self.warning_catcher.__exit__(None, None, None)
- def named_context(self, name):
- @contextlib.contextmanager
- def context():
- self.named_contexts.append(name)
- try:
- yield
- finally:
- self.assertEqual(self.named_contexts.pop(), name)
- return context
- def run_gen(self, f):
- f()
- return self.wait()
- def delay_callback(self, iterations, callback, arg):
- """Runs callback(arg) after a number of IOLoop iterations."""
- if iterations == 0:
- callback(arg)
- else:
- self.io_loop.add_callback(functools.partial(
- self.delay_callback, iterations - 1, callback, arg))
- with ignore_deprecation():
- @return_future
- def async_future(self, result, callback):
- self.io_loop.add_callback(callback, result)
- @gen.coroutine
- def async_exception(self, e):
- yield gen.moment
- raise e
- def test_no_yield(self):
- @gen.engine
- def f():
- self.stop()
- self.run_gen(f)
- def test_inline_cb(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))()
- res = yield gen.Wait("k1")
- self.assertTrue(res is None)
- self.stop()
- self.run_gen(f)
- def test_ioloop_cb(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.stop()
- self.run_gen(f)
- def test_exception_phase1(self):
- @gen.engine
- def f():
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.run_gen, f)
- def test_exception_phase2(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.run_gen, f)
- def test_exception_in_task_phase1(self):
- def fail_task(callback):
- 1 / 0
- @gen.engine
- def f():
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.stop()
- self.run_gen(f)
- def test_exception_in_task_phase2(self):
- # This is the case that requires the use of stack_context in gen.engine
- def fail_task(callback):
- self.io_loop.add_callback(lambda: 1 / 0)
- @gen.engine
- def f():
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.stop()
- self.run_gen(f)
- def test_with_arg(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))(42)
- res = yield gen.Wait("k1")
- self.assertEqual(42, res)
- self.stop()
- self.run_gen(f)
- def test_with_arg_tuple(self):
- @gen.engine
- def f():
- (yield gen.Callback((1, 2)))((3, 4))
- res = yield gen.Wait((1, 2))
- self.assertEqual((3, 4), res)
- self.stop()
- self.run_gen(f)
- def test_key_reuse(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- yield gen.Callback("k1")
- self.stop()
- self.assertRaises(gen.KeyReuseError, self.run_gen, f)
- def test_key_reuse_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- yield gen.Callback((1, 2))
- self.stop()
- self.assertRaises(gen.KeyReuseError, self.run_gen, f)
- def test_key_mismatch(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- yield gen.Wait("k2")
- self.stop()
- self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
- def test_key_mismatch_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- yield gen.Wait((2, 3))
- self.stop()
- self.assertRaises(gen.UnknownKeyError, self.run_gen, f)
- def test_leaked_callback(self):
- @gen.engine
- def f():
- yield gen.Callback("k1")
- self.stop()
- self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
- def test_leaked_callback_tuple(self):
- @gen.engine
- def f():
- yield gen.Callback((1, 2))
- self.stop()
- self.assertRaises(gen.LeakedCallbackError, self.run_gen, f)
- def test_parallel_callback(self):
- @gen.engine
- def f():
- for k in range(3):
- self.io_loop.add_callback((yield gen.Callback(k)))
- yield gen.Wait(1)
- self.io_loop.add_callback((yield gen.Callback(3)))
- yield gen.Wait(0)
- yield gen.Wait(3)
- yield gen.Wait(2)
- self.stop()
- self.run_gen(f)
- def test_bogus_yield(self):
- @gen.engine
- def f():
- yield 42
- self.assertRaises(gen.BadYieldError, self.run_gen, f)
- def test_bogus_yield_tuple(self):
- @gen.engine
- def f():
- yield (1, 2)
- self.assertRaises(gen.BadYieldError, self.run_gen, f)
- def test_reuse(self):
- @gen.engine
- def f():
- self.io_loop.add_callback((yield gen.Callback(0)))
- yield gen.Wait(0)
- self.stop()
- self.run_gen(f)
- self.run_gen(f)
- def test_task(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- self.stop()
- self.run_gen(f)
- def test_wait_all(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield gen.WaitAll(["k1", "k2"])
- self.assertEqual(results, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
- def test_exception_in_yield(self):
- @gen.engine
- def f():
- try:
- yield gen.Wait("k1")
- raise Exception("did not get expected exception")
- except gen.UnknownKeyError:
- pass
- self.stop()
- self.run_gen(f)
- def test_resume_after_exception_in_yield(self):
- @gen.engine
- def f():
- try:
- yield gen.Wait("k1")
- raise Exception("did not get expected exception")
- except gen.UnknownKeyError:
- pass
- (yield gen.Callback("k2"))("v2")
- self.assertEqual((yield gen.Wait("k2")), "v2")
- self.stop()
- self.run_gen(f)
- def test_orphaned_callback(self):
- @gen.engine
- def f():
- self.orphaned_callback = yield gen.Callback(1)
- try:
- self.run_gen(f)
- raise Exception("did not get expected exception")
- except gen.LeakedCallbackError:
- pass
- self.orphaned_callback()
- def test_none(self):
- @gen.engine
- def f():
- yield None
- self.stop()
- self.run_gen(f)
- def test_multi(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield [gen.Wait("k1"), gen.Wait("k2")]
- self.assertEqual(results, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
- def test_multi_dict(self):
- @gen.engine
- def f():
- (yield gen.Callback("k1"))("v1")
- (yield gen.Callback("k2"))("v2")
- results = yield dict(foo=gen.Wait("k1"), bar=gen.Wait("k2"))
- self.assertEqual(results, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
- # The following tests explicitly run with both gen.Multi
- # and gen.multi_future (Task returns a Future, so it can be used
- # with either).
- def test_multi_yieldpoint_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.Multi([
- gen.Task(self.delay_callback, 3, arg="v1"),
- gen.Task(self.delay_callback, 1, arg="v2"),
- ])
- self.assertEqual(responses, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
- def test_multi_yieldpoint_dict_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.Multi(dict(
- foo=gen.Task(self.delay_callback, 3, arg="v1"),
- bar=gen.Task(self.delay_callback, 1, arg="v2"),
- ))
- self.assertEqual(responses, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
- def test_multi_future_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future([
- gen.Task(self.delay_callback, 3, arg="v1"),
- gen.Task(self.delay_callback, 1, arg="v2"),
- ])
- self.assertEqual(responses, ["v1", "v2"])
- self.stop()
- self.run_gen(f)
- def test_multi_future_dict_delayed(self):
- @gen.engine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future(dict(
- foo=gen.Task(self.delay_callback, 3, arg="v1"),
- bar=gen.Task(self.delay_callback, 1, arg="v2"),
- ))
- self.assertEqual(responses, dict(foo="v1", bar="v2"))
- self.stop()
- self.run_gen(f)
- @skipOnTravis
- @gen_test
- def test_multi_performance(self):
- # Yielding a list used to have quadratic performance; make
- # sure a large list stays reasonable. On my laptop a list of
- # 2000 used to take 1.8s, now it takes 0.12.
- start = time.time()
- yield [gen.Task(self.io_loop.add_callback) for i in range(2000)]
- end = time.time()
- self.assertLess(end - start, 1.0)
- @gen_test
- def test_multi_empty(self):
- # Empty lists or dicts should return the same type.
- x = yield []
- self.assertTrue(isinstance(x, list))
- y = yield {}
- self.assertTrue(isinstance(y, dict))
- @gen_test
- def test_multi_mixed_types(self):
- # A YieldPoint (Wait) and Future (Task) can be combined
- # (and use the YieldPoint codepath)
- (yield gen.Callback("k1"))("v1")
- responses = yield [gen.Wait("k1"),
- gen.Task(self.delay_callback, 3, arg="v2")]
- self.assertEqual(responses, ["v1", "v2"])
- @gen_test
- def test_future(self):
- result = yield self.async_future(1)
- self.assertEqual(result, 1)
- @gen_test
- def test_multi_future(self):
- results = yield [self.async_future(1), self.async_future(2)]
- self.assertEqual(results, [1, 2])
- @gen_test
- def test_multi_future_duplicate(self):
- f = self.async_future(2)
- results = yield [self.async_future(1), f, self.async_future(3), f]
- self.assertEqual(results, [1, 2, 3, 2])
- @gen_test
- def test_multi_dict_future(self):
- results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
- self.assertEqual(results, dict(foo=1, bar=2))
- @gen_test
- def test_multi_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))])
- self.assertEqual(str(cm.exception), "error 1")
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_future(2)])
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
- @gen_test
- def test_multi_future_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))]
- self.assertEqual(str(cm.exception), "error 1")
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_future(2)]
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.multi_future(
- [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
- def test_arguments(self):
- @gen.engine
- def f():
- (yield gen.Callback("noargs"))()
- self.assertEqual((yield gen.Wait("noargs")), None)
- (yield gen.Callback("1arg"))(42)
- self.assertEqual((yield gen.Wait("1arg")), 42)
- (yield gen.Callback("kwargs"))(value=42)
- result = yield gen.Wait("kwargs")
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((), dict(value=42)), result)
- self.assertEqual(dict(value=42), result.kwargs)
- (yield gen.Callback("2args"))(42, 43)
- result = yield gen.Wait("2args")
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((42, 43), {}), result)
- self.assertEqual((42, 43), result.args)
- def task_func(callback):
- callback(None, error="foo")
- result = yield gen.Task(task_func)
- self.assertTrue(isinstance(result, gen.Arguments))
- self.assertEqual(((None,), dict(error="foo")), result)
- self.stop()
- self.run_gen(f)
- def test_stack_context_leak(self):
- # regression test: repeated invocations of a gen-based
- # function should not result in accumulated stack_contexts
- def _stack_depth():
- head = stack_context._state.contexts[1]
- length = 0
- while head is not None:
- length += 1
- head = head.old_contexts[1]
- return length
- @gen.engine
- def inner(callback):
- yield gen.Task(self.io_loop.add_callback)
- callback()
- @gen.engine
- def outer():
- for i in range(10):
- yield gen.Task(inner)
- stack_increase = _stack_depth() - initial_stack_depth
- self.assertTrue(stack_increase <= 2)
- self.stop()
- initial_stack_depth = _stack_depth()
- self.run_gen(outer)
- def test_stack_context_leak_exception(self):
- # same as previous, but with a function that exits with an exception
- @gen.engine
- def inner(callback):
- yield gen.Task(self.io_loop.add_callback)
- 1 / 0
- @gen.engine
- def outer():
- for i in range(10):
- try:
- yield gen.Task(inner)
- except ZeroDivisionError:
- pass
- stack_increase = len(stack_context._state.contexts) - initial_stack_depth
- self.assertTrue(stack_increase <= 2)
- self.stop()
- initial_stack_depth = len(stack_context._state.contexts)
- self.run_gen(outer)
- def function_with_stack_context(self, callback):
- # Technically this function should stack_context.wrap its callback
- # upon entry. However, it is very common for this step to be
- # omitted.
- def step2():
- self.assertEqual(self.named_contexts, ['a'])
- self.io_loop.add_callback(callback)
- with stack_context.StackContext(self.named_context('a')):
- self.io_loop.add_callback(step2)
- @gen_test
- def test_wait_transfer_stack_context(self):
- # Wait should not pick up contexts from where callback was invoked,
- # even if that function improperly fails to wrap its callback.
- cb = yield gen.Callback('k1')
- self.function_with_stack_context(cb)
- self.assertEqual(self.named_contexts, [])
- yield gen.Wait('k1')
- self.assertEqual(self.named_contexts, [])
- @gen_test
- def test_task_transfer_stack_context(self):
- yield gen.Task(self.function_with_stack_context)
- self.assertEqual(self.named_contexts, [])
- def test_raise_after_stop(self):
- # This pattern will be used in the following tests so make sure
- # the exception propagates as expected.
- @gen.engine
- def f():
- self.stop()
- 1 / 0
- with self.assertRaises(ZeroDivisionError):
- self.run_gen(f)
- def test_sync_raise_return(self):
- # gen.Return is allowed in @gen.engine, but it may not be used
- # to return a value.
- @gen.engine
- def f():
- self.stop(42)
- raise gen.Return()
- result = self.run_gen(f)
- self.assertEqual(result, 42)
- def test_async_raise_return(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- self.stop(42)
- raise gen.Return()
- result = self.run_gen(f)
- self.assertEqual(result, 42)
- def test_sync_raise_return_value(self):
- @gen.engine
- def f():
- raise gen.Return(42)
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- def test_sync_raise_return_value_tuple(self):
- @gen.engine
- def f():
- raise gen.Return((1, 2))
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- def test_async_raise_return_value(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return(42)
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- def test_async_raise_return_value_tuple(self):
- @gen.engine
- def f():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return((1, 2))
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- def test_return_value(self):
- # It is an error to apply @gen.engine to a function that returns
- # a value.
- @gen.engine
- def f():
- return 42
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- def test_return_value_tuple(self):
- # It is an error to apply @gen.engine to a function that returns
- # a value.
- @gen.engine
- def f():
- return (1, 2)
- with self.assertRaises(gen.ReturnValueIgnoredError):
- self.run_gen(f)
- @skipNotCPython
- def test_task_refcounting(self):
- # On CPython, tasks and their arguments should be released immediately
- # without waiting for garbage collection.
- @gen.engine
- def f():
- class Foo(object):
- pass
- arg = Foo()
- self.arg_ref = weakref.ref(arg)
- task = gen.Task(self.io_loop.add_callback, arg=arg)
- self.task_ref = weakref.ref(task)
- yield task
- self.stop()
- self.run_gen(f)
- self.assertIs(self.arg_ref(), None)
- self.assertIs(self.task_ref(), None)
- # GenBasicTest duplicates the non-deprecated portions of GenEngineTest
- # with gen.coroutine to ensure we don't lose coverage when gen.engine
- # goes away.
- class GenBasicTest(AsyncTestCase):
- @gen.coroutine
- def delay(self, iterations, arg):
- """Returns arg after a number of IOLoop iterations."""
- for i in range(iterations):
- yield gen.moment
- raise gen.Return(arg)
- with ignore_deprecation():
- @return_future
- def async_future(self, result, callback):
- self.io_loop.add_callback(callback, result)
- @gen.coroutine
- def async_exception(self, e):
- yield gen.moment
- raise e
- @gen.coroutine
- def add_one_async(self, x):
- yield gen.moment
- raise gen.Return(x + 1)
- def test_no_yield(self):
- @gen.coroutine
- def f():
- pass
- self.io_loop.run_sync(f)
- def test_exception_phase1(self):
- @gen.coroutine
- def f():
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
- def test_exception_phase2(self):
- @gen.coroutine
- def f():
- yield gen.moment
- 1 / 0
- self.assertRaises(ZeroDivisionError, self.io_loop.run_sync, f)
- def test_bogus_yield(self):
- @gen.coroutine
- def f():
- yield 42
- self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
- def test_bogus_yield_tuple(self):
- @gen.coroutine
- def f():
- yield (1, 2)
- self.assertRaises(gen.BadYieldError, self.io_loop.run_sync, f)
- def test_reuse(self):
- @gen.coroutine
- def f():
- yield gen.moment
- self.io_loop.run_sync(f)
- self.io_loop.run_sync(f)
- def test_none(self):
- @gen.coroutine
- def f():
- yield None
- self.io_loop.run_sync(f)
- def test_multi(self):
- @gen.coroutine
- def f():
- results = yield [self.add_one_async(1), self.add_one_async(2)]
- self.assertEqual(results, [2, 3])
- self.io_loop.run_sync(f)
- def test_multi_dict(self):
- @gen.coroutine
- def f():
- results = yield dict(foo=self.add_one_async(1), bar=self.add_one_async(2))
- self.assertEqual(results, dict(foo=2, bar=3))
- self.io_loop.run_sync(f)
- def test_multi_delayed(self):
- @gen.coroutine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future([
- self.delay(3, "v1"),
- self.delay(1, "v2"),
- ])
- self.assertEqual(responses, ["v1", "v2"])
- self.io_loop.run_sync(f)
- def test_multi_dict_delayed(self):
- @gen.coroutine
- def f():
- # callbacks run at different times
- responses = yield gen.multi_future(dict(
- foo=self.delay(3, "v1"),
- bar=self.delay(1, "v2"),
- ))
- self.assertEqual(responses, dict(foo="v1", bar="v2"))
- self.io_loop.run_sync(f)
- @skipOnTravis
- @gen_test
- def test_multi_performance(self):
- # Yielding a list used to have quadratic performance; make
- # sure a large list stays reasonable. On my laptop a list of
- # 2000 used to take 1.8s, now it takes 0.12.
- start = time.time()
- yield [gen.moment for i in range(2000)]
- end = time.time()
- self.assertLess(end - start, 1.0)
- @gen_test
- def test_multi_empty(self):
- # Empty lists or dicts should return the same type.
- x = yield []
- self.assertTrue(isinstance(x, list))
- y = yield {}
- self.assertTrue(isinstance(y, dict))
- @gen_test
- def test_future(self):
- result = yield self.async_future(1)
- self.assertEqual(result, 1)
- @gen_test
- def test_multi_future(self):
- results = yield [self.async_future(1), self.async_future(2)]
- self.assertEqual(results, [1, 2])
- @gen_test
- def test_multi_future_duplicate(self):
- f = self.async_future(2)
- results = yield [self.async_future(1), f, self.async_future(3), f]
- self.assertEqual(results, [1, 2, 3, 2])
- @gen_test
- def test_multi_dict_future(self):
- results = yield dict(foo=self.async_future(1), bar=self.async_future(2))
- self.assertEqual(results, dict(foo=1, bar=2))
- @gen_test
- def test_multi_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))])
- self.assertEqual(str(cm.exception), "error 1")
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_future(2)])
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.Multi([self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
- @gen_test
- def test_multi_future_exceptions(self):
- with ExpectLog(app_log, "Multiple exceptions in yield list"):
- with self.assertRaises(RuntimeError) as cm:
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))]
- self.assertEqual(str(cm.exception), "error 1")
- # With only one exception, no error is logged.
- with self.assertRaises(RuntimeError):
- yield [self.async_exception(RuntimeError("error 1")),
- self.async_future(2)]
- # Exception logging may be explicitly quieted.
- with self.assertRaises(RuntimeError):
- yield gen.multi_future(
- [self.async_exception(RuntimeError("error 1")),
- self.async_exception(RuntimeError("error 2"))],
- quiet_exceptions=RuntimeError)
- def test_sync_raise_return(self):
- @gen.coroutine
- def f():
- raise gen.Return()
- self.io_loop.run_sync(f)
- def test_async_raise_return(self):
- @gen.coroutine
- def f():
- yield gen.moment
- raise gen.Return()
- self.io_loop.run_sync(f)
- def test_sync_raise_return_value(self):
- @gen.coroutine
- def f():
- raise gen.Return(42)
- self.assertEqual(42, self.io_loop.run_sync(f))
- def test_sync_raise_return_value_tuple(self):
- @gen.coroutine
- def f():
- raise gen.Return((1, 2))
- self.assertEqual((1, 2), self.io_loop.run_sync(f))
- def test_async_raise_return_value(self):
- @gen.coroutine
- def f():
- yield gen.moment
- raise gen.Return(42)
- self.assertEqual(42, self.io_loop.run_sync(f))
- def test_async_raise_return_value_tuple(self):
- @gen.coroutine
- def f():
- yield gen.moment
- raise gen.Return((1, 2))
- self.assertEqual((1, 2), self.io_loop.run_sync(f))
- class GenCoroutineTest(AsyncTestCase):
- def setUp(self):
- # Stray StopIteration exceptions can lead to tests exiting prematurely,
- # so we need explicit checks here to make sure the tests run all
- # the way through.
- self.finished = False
- super(GenCoroutineTest, self).setUp()
- def tearDown(self):
- super(GenCoroutineTest, self).tearDown()
- assert self.finished
- def test_attributes(self):
- self.finished = True
- def f():
- yield gen.moment
- coro = gen.coroutine(f)
- self.assertEqual(coro.__name__, f.__name__)
- self.assertEqual(coro.__module__, f.__module__)
- self.assertIs(coro.__wrapped__, f)
- def test_is_coroutine_function(self):
- self.finished = True
- def f():
- yield gen.moment
- coro = gen.coroutine(f)
- self.assertFalse(gen.is_coroutine_function(f))
- self.assertTrue(gen.is_coroutine_function(coro))
- self.assertFalse(gen.is_coroutine_function(coro()))
- @gen_test
- def test_sync_gen_return(self):
- @gen.coroutine
- def f():
- raise gen.Return(42)
- result = yield f()
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_async_gen_return(self):
- @gen.coroutine
- def f():
- yield gen.moment
- raise gen.Return(42)
- result = yield f()
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_sync_return(self):
- @gen.coroutine
- def f():
- return 42
- result = yield f()
- self.assertEqual(result, 42)
- self.finished = True
- @skipBefore33
- @gen_test
- def test_async_return(self):
- namespace = exec_test(globals(), locals(), """
- @gen.coroutine
- def f():
- yield gen.moment
- return 42
- """)
- result = yield namespace['f']()
- self.assertEqual(result, 42)
- self.finished = True
- @skipBefore33
- @gen_test
- def test_async_early_return(self):
- # A yield statement exists but is not executed, which means
- # this function "returns" via an exception. This exception
- # doesn't happen before the exception handling is set up.
- namespace = exec_test(globals(), locals(), """
- @gen.coroutine
- def f():
- if True:
- return 42
- yield gen.Task(self.io_loop.add_callback)
- """)
- result = yield namespace['f']()
- self.assertEqual(result, 42)
- self.finished = True
- @skipBefore35
- @gen_test
- def test_async_await(self):
- @gen.coroutine
- def f1():
- yield gen.moment
- raise gen.Return(42)
- # This test verifies that an async function can await a
- # yield-based gen.coroutine, and that a gen.coroutine
- # (the test method itself) can yield an async function.
- namespace = exec_test(globals(), locals(), """
- async def f2():
- result = await f1()
- return result
- """)
- result = yield namespace['f2']()
- self.assertEqual(result, 42)
- self.finished = True
- @skipBefore35
- @gen_test
- def test_asyncio_sleep_zero(self):
- # asyncio.sleep(0) turns into a special case (equivalent to
- # `yield None`)
- namespace = exec_test(globals(), locals(), """
- async def f():
- import asyncio
- await asyncio.sleep(0)
- return 42
- """)
- result = yield namespace['f']()
- self.assertEqual(result, 42)
- self.finished = True
- @skipBefore35
- @gen_test
- def test_async_await_mixed_multi_native_future(self):
- @gen.coroutine
- def f1():
- yield gen.moment
- namespace = exec_test(globals(), locals(), """
- async def f2():
- await f1()
- return 42
- """)
- @gen.coroutine
- def f3():
- yield gen.moment
- raise gen.Return(43)
- results = yield [namespace['f2'](), f3()]
- self.assertEqual(results, [42, 43])
- self.finished = True
- @skipBefore35
- @gen_test
- def test_async_await_mixed_multi_native_yieldpoint(self):
- namespace = exec_test(globals(), locals(), """
- async def f1():
- await gen.Task(self.io_loop.add_callback)
- return 42
- """)
- @gen.coroutine
- def f2():
- yield gen.Task(self.io_loop.add_callback)
- raise gen.Return(43)
- with ignore_deprecation():
- f2(callback=(yield gen.Callback('cb')))
- results = yield [namespace['f1'](), gen.Wait('cb')]
- self.assertEqual(results, [42, 43])
- self.finished = True
- @skipBefore35
- @gen_test
- def test_async_with_timeout(self):
- namespace = exec_test(globals(), locals(), """
- async def f1():
- return 42
- """)
- result = yield gen.with_timeout(datetime.timedelta(hours=1),
- namespace['f1']())
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_sync_return_no_value(self):
- @gen.coroutine
- def f():
- return
- result = yield f()
- self.assertEqual(result, None)
- self.finished = True
- @gen_test
- def test_async_return_no_value(self):
- # Without a return value we don't need python 3.3.
- @gen.coroutine
- def f():
- yield gen.moment
- return
- result = yield f()
- self.assertEqual(result, None)
- self.finished = True
- @gen_test
- def test_sync_raise(self):
- @gen.coroutine
- def f():
- 1 / 0
- # The exception is raised when the future is yielded
- # (or equivalently when its result method is called),
- # not when the function itself is called).
- future = f()
- with self.assertRaises(ZeroDivisionError):
- yield future
- self.finished = True
- @gen_test
- def test_async_raise(self):
- @gen.coroutine
- def f():
- yield gen.moment
- 1 / 0
- future = f()
- with self.assertRaises(ZeroDivisionError):
- yield future
- self.finished = True
- @gen_test
- def test_pass_callback(self):
- with ignore_deprecation():
- @gen.coroutine
- def f():
- raise gen.Return(42)
- result = yield gen.Task(f)
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_replace_yieldpoint_exception(self):
- # Test exception handling: a coroutine can catch one exception
- # raised by a yield point and raise a different one.
- @gen.coroutine
- def f1():
- 1 / 0
- @gen.coroutine
- def f2():
- try:
- yield f1()
- except ZeroDivisionError:
- raise KeyError()
- future = f2()
- with self.assertRaises(KeyError):
- yield future
- self.finished = True
- @gen_test
- def test_swallow_yieldpoint_exception(self):
- # Test exception handling: a coroutine can catch an exception
- # raised by a yield point and not raise a different one.
- @gen.coroutine
- def f1():
- 1 / 0
- @gen.coroutine
- def f2():
- try:
- yield f1()
- except ZeroDivisionError:
- raise gen.Return(42)
- result = yield f2()
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_replace_context_exception(self):
- with ignore_deprecation():
- # Test exception handling: exceptions thrown into the stack context
- # can be caught and replaced.
- # Note that this test and the following are for behavior that is
- # not really supported any more: coroutines no longer create a
- # stack context automatically; but one is created after the first
- # YieldPoint (i.e. not a Future).
- @gen.coroutine
- def f2():
- (yield gen.Callback(1))()
- yield gen.Wait(1)
- self.io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(self.io_loop.add_timeout,
- self.io_loop.time() + 10)
- except ZeroDivisionError:
- raise KeyError()
- future = f2()
- with self.assertRaises(KeyError):
- yield future
- self.finished = True
- @gen_test
- def test_swallow_context_exception(self):
- with ignore_deprecation():
- # Test exception handling: exceptions thrown into the stack context
- # can be caught and ignored.
- @gen.coroutine
- def f2():
- (yield gen.Callback(1))()
- yield gen.Wait(1)
- self.io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(self.io_loop.add_timeout,
- self.io_loop.time() + 10)
- except ZeroDivisionError:
- raise gen.Return(42)
- result = yield f2()
- self.assertEqual(result, 42)
- self.finished = True
- @gen_test
- def test_moment(self):
- calls = []
- @gen.coroutine
- def f(name, yieldable):
- for i in range(5):
- calls.append(name)
- yield yieldable
- # First, confirm the behavior without moment: each coroutine
- # monopolizes the event loop until it finishes.
- immediate = Future()
- immediate.set_result(None)
- yield [f('a', immediate), f('b', immediate)]
- self.assertEqual(''.join(calls), 'aaaaabbbbb')
- # With moment, they take turns.
- calls = []
- yield [f('a', gen.moment), f('b', gen.moment)]
- self.assertEqual(''.join(calls), 'ababababab')
- self.finished = True
- calls = []
- yield [f('a', gen.moment), f('b', immediate)]
- self.assertEqual(''.join(calls), 'abbbbbaaaa')
- @gen_test
- def test_sleep(self):
- yield gen.sleep(0.01)
- self.finished = True
- @skipBefore33
- @gen_test
- def test_py3_leak_exception_context(self):
- class LeakedException(Exception):
- pass
- @gen.coroutine
- def inner(iteration):
- raise LeakedException(iteration)
- try:
- yield inner(1)
- except LeakedException as e:
- self.assertEqual(str(e), "1")
- self.assertIsNone(e.__context__)
- try:
- yield inner(2)
- except LeakedException as e:
- self.assertEqual(str(e), "2")
- self.assertIsNone(e.__context__)
- self.finished = True
- @skipNotCPython
- @unittest.skipIf((3,) < sys.version_info < (3, 6),
- "asyncio.Future has reference cycles")
- def test_coroutine_refcounting(self):
- # On CPython, tasks and their arguments should be released immediately
- # without waiting for garbage collection.
- @gen.coroutine
- def inner():
- class Foo(object):
- pass
- local_var = Foo()
- self.local_ref = weakref.ref(local_var)
- yield gen.coroutine(lambda: None)()
- raise ValueError('Some error')
- @gen.coroutine
- def inner2():
- try:
- yield inner()
- except ValueError:
- pass
- self.io_loop.run_sync(inner2, timeout=3)
- self.assertIs(self.local_ref(), None)
- self.finished = True
- @unittest.skipIf(sys.version_info < (3,),
- "test only relevant with asyncio Futures")
- def test_asyncio_future_debug_info(self):
- self.finished = True
- # Enable debug mode
- asyncio_loop = asyncio.get_event_loop()
- self.addCleanup(asyncio_loop.set_debug, asyncio_loop.get_debug())
- asyncio_loop.set_debug(True)
- def f():
- yield gen.moment
- coro = gen.coroutine(f)()
- self.assertIsInstance(coro, asyncio.Future)
- # We expect the coroutine repr() to show the place where
- # it was instantiated
- expected = ("created at %s:%d"
- % (__file__, f.__code__.co_firstlineno + 3))
- actual = repr(coro)
- self.assertIn(expected, actual)
- @unittest.skipIf(asyncio is None, "asyncio module not present")
- @gen_test
- def test_asyncio_gather(self):
- # This demonstrates that tornado coroutines can be understood
- # by asyncio (This failed prior to Tornado 5.0).
- @gen.coroutine
- def f():
- yield gen.moment
- raise gen.Return(1)
- ret = yield asyncio.gather(f(), f())
- self.assertEqual(ret, [1, 1])
- self.finished = True
- class GenSequenceHandler(RequestHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- # The outer ignore_deprecation applies at definition time.
- # We need another for serving time.
- with ignore_deprecation():
- self.io_loop = self.request.connection.stream.io_loop
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.write("1")
- self.io_loop.add_callback((yield gen.Callback("k2")))
- yield gen.Wait("k2")
- self.write("2")
- # reuse an old key
- self.io_loop.add_callback((yield gen.Callback("k1")))
- yield gen.Wait("k1")
- self.finish("3")
- class GenCoroutineSequenceHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- yield gen.moment
- self.write("1")
- yield gen.moment
- self.write("2")
- yield gen.moment
- self.finish("3")
- class GenCoroutineUnfinishedSequenceHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- yield gen.moment
- self.write("1")
- yield gen.moment
- self.write("2")
- yield gen.moment
- # just write, don't finish
- self.write("3")
- class GenTaskHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- client = AsyncHTTPClient()
- with ignore_deprecation():
- response = yield gen.Task(client.fetch, self.get_argument('url'))
- response.rethrow()
- self.finish(b"got response: " + response.body)
- class GenExceptionHandler(RequestHandler):
- with ignore_deprecation():
- @asynchronous
- @gen.engine
- def get(self):
- # This test depends on the order of the two decorators.
- io_loop = self.request.connection.stream.io_loop
- yield gen.Task(io_loop.add_callback)
- raise Exception("oops")
- class GenCoroutineExceptionHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- # This test depends on the order of the two decorators.
- io_loop = self.request.connection.stream.io_loop
- yield gen.Task(io_loop.add_callback)
- raise Exception("oops")
- class GenYieldExceptionHandler(RequestHandler):
- @gen.coroutine
- def get(self):
- io_loop = self.request.connection.stream.io_loop
- # Test the interaction of the two stack_contexts.
- with ignore_deprecation():
- def fail_task(callback):
- io_loop.add_callback(lambda: 1 / 0)
- try:
- yield gen.Task(fail_task)
- raise Exception("did not get expected exception")
- except ZeroDivisionError:
- self.finish('ok')
- # "Undecorated" here refers to the absence of @asynchronous.
- class UndecoratedCoroutinesHandler(RequestHandler):
- @gen.coroutine
- def prepare(self):
- self.chunks = []
- yield gen.moment
- self.chunks.append('1')
- @gen.coroutine
- def get(self):
- self.chunks.append('2')
- yield gen.moment
- self.chunks.append('3')
- yield gen.moment
- self.write(''.join(self.chunks))
- class AsyncPrepareErrorHandler(RequestHandler):
- @gen.coroutine
- def prepare(self):
- yield gen.moment
- raise HTTPError(403)
- def get(self):
- self.finish('ok')
- class NativeCoroutineHandler(RequestHandler):
- if sys.version_info > (3, 5):
- exec(textwrap.dedent("""
- async def get(self):
- import asyncio
- await asyncio.sleep(0)
- self.write("ok")
- """))
- class GenWebTest(AsyncHTTPTestCase):
- def get_app(self):
- return Application([
- ('/sequence', GenSequenceHandler),
- ('/coroutine_sequence', GenCoroutineSequenceHandler),
- ('/coroutine_unfinished_sequence',
- GenCoroutineUnfinishedSequenceHandler),
- ('/task', GenTaskHandler),
- ('/exception', GenExceptionHandler),
- ('/coroutine_exception', GenCoroutineExceptionHandler),
- ('/yield_exception', GenYieldExceptionHandler),
- ('/undecorated_coroutine', UndecoratedCoroutinesHandler),
- ('/async_prepare_error', AsyncPrepareErrorHandler),
- ('/native_coroutine', NativeCoroutineHandler),
- ])
- def test_sequence_handler(self):
- response = self.fetch('/sequence')
- self.assertEqual(response.body, b"123")
- def test_coroutine_sequence_handler(self):
- response = self.fetch('/coroutine_sequence')
- self.assertEqual(response.body, b"123")
- def test_coroutine_unfinished_sequence_handler(self):
- response = self.fetch('/coroutine_unfinished_sequence')
- self.assertEqual(response.body, b"123")
- def test_task_handler(self):
- response = self.fetch('/task?url=%s' % url_escape(self.get_url('/sequence')))
- self.assertEqual(response.body, b"got response: 123")
- def test_exception_handler(self):
- # Make sure we get an error and not a timeout
- with ExpectLog(app_log, "Uncaught exception GET /exception"):
- response = self.fetch('/exception')
- self.assertEqual(500, response.code)
- def test_coroutine_exception_handler(self):
- # Make sure we get an error and not a timeout
- with ExpectLog(app_log, "Uncaught exception GET /coroutine_exception"):
- response = self.fetch('/coroutine_exception')
- self.assertEqual(500, response.code)
- def test_yield_exception_handler(self):
- response = self.fetch('/yield_exception')
- self.assertEqual(response.body, b'ok')
- def test_undecorated_coroutines(self):
- response = self.fetch('/undecorated_coroutine')
- self.assertEqual(response.body, b'123')
- def test_async_prepare_error_handler(self):
- response = self.fetch('/async_prepare_error')
- self.assertEqual(response.code, 403)
- @skipBefore35
- def test_native_coroutine_handler(self):
- response = self.fetch('/native_coroutine')
- self.assertEqual(response.code, 200)
- self.assertEqual(response.body, b'ok')
- class WithTimeoutTest(AsyncTestCase):
- @gen_test
- def test_timeout(self):
- with self.assertRaises(gen.TimeoutError):
- yield gen.with_timeout(datetime.timedelta(seconds=0.1),
- Future())
- @gen_test
- def test_completes_before_timeout(self):
- future = Future()
- self.io_loop.add_timeout(datetime.timedelta(seconds=0.1),
- lambda: future.set_result('asdf'))
- result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
- future)
- self.assertEqual(result, 'asdf')
- @gen_test
- def test_fails_before_timeout(self):
- future = Future()
- self.io_loop.add_timeout(
- datetime.timedelta(seconds=0.1),
- lambda: future.set_exception(ZeroDivisionError()))
- with self.assertRaises(ZeroDivisionError):
- yield gen.with_timeout(datetime.timedelta(seconds=3600),
- future)
- @gen_test
- def test_already_resolved(self):
- future = Future()
- future.set_result('asdf')
- result = yield gen.with_timeout(datetime.timedelta(seconds=3600),
- future)
- self.assertEqual(result, 'asdf')
- @unittest.skipIf(futures is None, 'futures module not present')
- @gen_test
- def test_timeout_concurrent_future(self):
- # A concurrent future that does not resolve before the timeout.
- with futures.ThreadPoolExecutor(1) as executor:
- with self.assertRaises(gen.TimeoutError):
- yield gen.with_timeout(self.io_loop.time(),
- executor.submit(time.sleep, 0.1))
- @unittest.skipIf(futures is None, 'futures module not present')
- @gen_test
- def test_completed_concurrent_future(self):
- # A concurrent future that is resolved before we even submit it
- # to with_timeout.
- with futures.ThreadPoolExecutor(1) as executor:
- f = executor.submit(lambda: None)
- f.result() # wait for completion
- yield gen.with_timeout(datetime.timedelta(seconds=3600), f)
- @unittest.skipIf(futures is None, 'futures module not present')
- @gen_test
- def test_normal_concurrent_future(self):
- # A conccurrent future that resolves while waiting for the timeout.
- with futures.ThreadPoolExecutor(1) as executor:
- yield gen.with_timeout(datetime.timedelta(seconds=3600),
- executor.submit(lambda: time.sleep(0.01)))
- class WaitIteratorTest(AsyncTestCase):
- @gen_test
- def test_empty_iterator(self):
- g = gen.WaitIterator()
- self.assertTrue(g.done(), 'empty generator iterated')
- with self.assertRaises(ValueError):
- g = gen.WaitIterator(False, bar=False)
- self.assertEqual(g.current_index, None, "bad nil current index")
- self.assertEqual(g.current_future, None, "bad nil current future")
- @gen_test
- def test_already_done(self):
- f1 = Future()
- f2 = Future()
- f3 = Future()
- f1.set_result(24)
- f2.set_result(42)
- f3.set_result(84)
- g = gen.WaitIterator(f1, f2, f3)
- i = 0
- while not g.done():
- r = yield g.next()
- # Order is not guaranteed, but the current implementation
- # preserves ordering of already-done Futures.
- if i == 0:
- self.assertEqual(g.current_index, 0)
- self.assertIs(g.current_future, f1)
- self.assertEqual(r, 24)
- elif i == 1:
- self.assertEqual(g.current_index, 1)
- self.assertIs(g.current_future, f2)
- self.assertEqual(r, 42)
- elif i == 2:
- self.assertEqual(g.current_index, 2)
- self.assertIs(g.current_future, f3)
- self.assertEqual(r, 84)
- i += 1
- self.assertEqual(g.current_index, None, "bad nil current index")
- self.assertEqual(g.current_future, None, "bad nil current future")
- dg = gen.WaitIterator(f1=f1, f2=f2)
- while not dg.done():
- dr = yield dg.next()
- if dg.current_index == "f1":
- self.assertTrue(dg.current_future == f1 and dr == 24,
- "WaitIterator dict status incorrect")
- elif dg.current_index == "f2":
- self.assertTrue(dg.current_future == f2 and dr == 42,
- "WaitIterator dict status incorrect")
- else:
- self.fail("got bad WaitIterator index {}".format(
- dg.current_index))
- i += 1
- self.assertEqual(dg.current_index, None, "bad nil current index")
- self.assertEqual(dg.current_future, None, "bad nil current future")
- def finish_coroutines(self, iteration, futures):
- if iteration == 3:
- futures[2].set_result(24)
- elif iteration == 5:
- futures[0].set_exception(ZeroDivisionError())
- elif iteration == 8:
- futures[1].set_result(42)
- futures[3].set_result(84)
- if iteration < 8:
- self.io_loop.add_callback(self.finish_coroutines, iteration + 1, futures)
- @gen_test
- def test_iterator(self):
- futures = [Future(), Future(), Future(), Future()]
- self.finish_coroutines(0, futures)
- g = gen.WaitIterator(*futures)
- i = 0
- while not g.done():
- try:
- r = yield g.next()
- except ZeroDivisionError:
- self.assertIs(g.current_future, futures[0],
- 'exception future invalid')
- else:
- if i == 0:
- self.assertEqual(r, 24, 'iterator value incorrect')
- self.assertEqual(g.current_index, 2, 'wrong index')
- elif i == 2:
- self.assertEqual(r, 42, 'iterator value incorrect')
- self.assertEqual(g.current_index, 1, 'wrong index')
- elif i == 3:
- self.assertEqual(r, 84, 'iterator value incorrect')
- self.assertEqual(g.current_index, 3, 'wrong index')
- i += 1
- @skipBefore35
- @gen_test
- def test_iterator_async_await(self):
- # Recreate the previous test with py35 syntax. It's a little clunky
- # because of the way the previous test handles an exception on
- # a single iteration.
- futures = [Future(), Future(), Future(), Future()]
- self.finish_coroutines(0, futures)
- self.finished = False
- namespace = exec_test(globals(), locals(), """
- async def f():
- i = 0
- g = gen.WaitIterator(*futures)
- try:
- async for r in g:
- if i == 0:
- self.assertEqual(r, 24, 'iterator value incorrect')
- self.assertEqual(g.current_index, 2, 'wrong index')
- else:
- raise Exception("expected exception on iteration 1")
- i += 1
- except ZeroDivisionError:
- i += 1
- async for r in g:
- if i == 2:
- self.assertEqual(r, 42, 'iterator value incorrect')
- self.assertEqual(g.current_index, 1, 'wrong index')
- elif i == 3:
- self.assertEqual(r, 84, 'iterator value incorrect')
- self.assertEqual(g.current_index, 3, 'wrong index')
- else:
- raise Exception("didn't expect iteration %d" % i)
- i += 1
- self.finished = True
- """)
- yield namespace['f']()
- self.assertTrue(self.finished)
- @gen_test
- def test_no_ref(self):
- # In this usage, there is no direct hard reference to the
- # WaitIterator itself, only the Future it returns. Since
- # WaitIterator uses weak references internally to improve GC
- # performance, this used to cause problems.
- yield gen.with_timeout(datetime.timedelta(seconds=0.1),
- gen.WaitIterator(gen.sleep(0)).next())
- class RunnerGCTest(AsyncTestCase):
- def is_pypy3(self):
- return (platform.python_implementation() == 'PyPy' and
- sys.version_info > (3,))
- @gen_test
- def test_gc(self):
- # Github issue 1769: Runner objects can get GCed unexpectedly
- # while their future is alive.
- weakref_scope = [None]
- def callback():
- gc.collect(2)
- weakref_scope[0]().set_result(123)
- @gen.coroutine
- def tester():
- fut = Future()
- weakref_scope[0] = weakref.ref(fut)
- self.io_loop.add_callback(callback)
- yield fut
- yield gen.with_timeout(
- datetime.timedelta(seconds=0.2),
- tester()
- )
- def test_gc_infinite_coro(self):
- # Github issue 2229: suspended coroutines should be GCed when
- # their loop is closed, even if they're involved in a reference
- # cycle.
- if IOLoop.configured_class().__name__.endswith('TwistedIOLoop'):
- raise unittest.SkipTest("Test may fail on TwistedIOLoop")
- loop = self.get_new_ioloop()
- result = []
- wfut = []
- @gen.coroutine
- def infinite_coro():
- try:
- while True:
- yield gen.sleep(1e-3)
- result.append(True)
- finally:
- # coroutine finalizer
- result.append(None)
- @gen.coroutine
- def do_something():
- fut = infinite_coro()
- fut._refcycle = fut
- wfut.append(weakref.ref(fut))
- yield gen.sleep(0.2)
- loop.run_sync(do_something)
- loop.close()
- gc.collect()
- # Future was collected
- self.assertIs(wfut[0](), None)
- # At least one wakeup
- self.assertGreaterEqual(len(result), 2)
- if not self.is_pypy3():
- # coroutine finalizer was called (not on PyPy3 apparently)
- self.assertIs(result[-1], None)
- @skipBefore35
- def test_gc_infinite_async_await(self):
- # Same as test_gc_infinite_coro, but with a `async def` function
- import asyncio
- namespace = exec_test(globals(), locals(), """
- async def infinite_coro(result):
- try:
- while True:
- await gen.sleep(1e-3)
- result.append(True)
- finally:
- # coroutine finalizer
- result.append(None)
- """)
- infinite_coro = namespace['infinite_coro']
- loop = self.get_new_ioloop()
- result = []
- wfut = []
- @gen.coroutine
- def do_something():
- fut = asyncio.get_event_loop().create_task(infinite_coro(result))
- fut._refcycle = fut
- wfut.append(weakref.ref(fut))
- yield gen.sleep(0.2)
- loop.run_sync(do_something)
- with ExpectLog('asyncio', "Task was destroyed but it is pending"):
- loop.close()
- gc.collect()
- # Future was collected
- self.assertIs(wfut[0](), None)
- # At least one wakeup and one finally
- self.assertGreaterEqual(len(result), 2)
- if not self.is_pypy3():
- # coroutine finalizer was called (not on PyPy3 apparently)
- self.assertIs(result[-1], None)
- def test_multi_moment(self):
- # Test gen.multi with moment
- # now that it's not a real Future
- @gen.coroutine
- def wait_a_moment():
- result = yield gen.multi([gen.moment, gen.moment])
- raise gen.Return(result)
- loop = self.get_new_ioloop()
- result = loop.run_sync(wait_a_moment)
- self.assertEqual(result, [None, None])
- if __name__ == '__main__':
- unittest.main()
|