123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206 |
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- from __future__ import absolute_import, division, print_function
- from concurrent.futures import ThreadPoolExecutor
- from tornado import gen
- from tornado.ioloop import IOLoop
- from tornado.testing import AsyncTestCase, gen_test
- from tornado.test.util import unittest, skipBefore33, skipBefore35, exec_test
- try:
- from tornado.platform.asyncio import asyncio
- except ImportError:
- asyncio = None
- else:
- from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future, AnyThreadEventLoopPolicy
- # This is used in dynamically-evaluated code, so silence pyflakes.
- to_asyncio_future
- @unittest.skipIf(asyncio is None, "asyncio module not present")
- class AsyncIOLoopTest(AsyncTestCase):
- def get_new_ioloop(self):
- io_loop = AsyncIOLoop()
- return io_loop
- def test_asyncio_callback(self):
- # Basic test that the asyncio loop is set up correctly.
- asyncio.get_event_loop().call_soon(self.stop)
- self.wait()
- @gen_test
- def test_asyncio_future(self):
- # Test that we can yield an asyncio future from a tornado coroutine.
- # Without 'yield from', we must wrap coroutines in ensure_future,
- # which was introduced during Python 3.4, deprecating the prior "async".
- if hasattr(asyncio, 'ensure_future'):
- ensure_future = asyncio.ensure_future
- else:
- # async is a reserved word in Python 3.7
- ensure_future = getattr(asyncio, 'async')
- x = yield ensure_future(
- asyncio.get_event_loop().run_in_executor(None, lambda: 42))
- self.assertEqual(x, 42)
- @skipBefore33
- @gen_test
- def test_asyncio_yield_from(self):
- # Test that we can use asyncio coroutines with 'yield from'
- # instead of asyncio.async(). This requires python 3.3 syntax.
- namespace = exec_test(globals(), locals(), """
- @gen.coroutine
- def f():
- event_loop = asyncio.get_event_loop()
- x = yield from event_loop.run_in_executor(None, lambda: 42)
- return x
- """)
- result = yield namespace['f']()
- self.assertEqual(result, 42)
- @skipBefore35
- def test_asyncio_adapter(self):
- # This test demonstrates that when using the asyncio coroutine
- # runner (i.e. run_until_complete), the to_asyncio_future
- # adapter is needed. No adapter is needed in the other direction,
- # as demonstrated by other tests in the package.
- @gen.coroutine
- def tornado_coroutine():
- yield gen.moment
- raise gen.Return(42)
- native_coroutine_without_adapter = exec_test(globals(), locals(), """
- async def native_coroutine_without_adapter():
- return await tornado_coroutine()
- """)["native_coroutine_without_adapter"]
- native_coroutine_with_adapter = exec_test(globals(), locals(), """
- async def native_coroutine_with_adapter():
- return await to_asyncio_future(tornado_coroutine())
- """)["native_coroutine_with_adapter"]
- # Use the adapter, but two degrees from the tornado coroutine.
- native_coroutine_with_adapter2 = exec_test(globals(), locals(), """
- async def native_coroutine_with_adapter2():
- return await to_asyncio_future(native_coroutine_without_adapter())
- """)["native_coroutine_with_adapter2"]
- # Tornado supports native coroutines both with and without adapters
- self.assertEqual(
- self.io_loop.run_sync(native_coroutine_without_adapter),
- 42)
- self.assertEqual(
- self.io_loop.run_sync(native_coroutine_with_adapter),
- 42)
- self.assertEqual(
- self.io_loop.run_sync(native_coroutine_with_adapter2),
- 42)
- # Asyncio only supports coroutines that yield asyncio-compatible
- # Futures (which our Future is since 5.0).
- self.assertEqual(
- asyncio.get_event_loop().run_until_complete(
- native_coroutine_without_adapter()),
- 42)
- self.assertEqual(
- asyncio.get_event_loop().run_until_complete(
- native_coroutine_with_adapter()),
- 42)
- self.assertEqual(
- asyncio.get_event_loop().run_until_complete(
- native_coroutine_with_adapter2()),
- 42)
- @unittest.skipIf(asyncio is None, "asyncio module not present")
- class LeakTest(unittest.TestCase):
- def setUp(self):
- # Trigger a cleanup of the mapping so we start with a clean slate.
- AsyncIOLoop().close()
- # If we don't clean up after ourselves other tests may fail on
- # py34.
- self.orig_policy = asyncio.get_event_loop_policy()
- asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
- def tearDown(self):
- asyncio.get_event_loop().close()
- asyncio.set_event_loop_policy(self.orig_policy)
- def test_ioloop_close_leak(self):
- orig_count = len(IOLoop._ioloop_for_asyncio)
- for i in range(10):
- # Create and close an AsyncIOLoop using Tornado interfaces.
- loop = AsyncIOLoop()
- loop.close()
- new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
- self.assertEqual(new_count, 0)
- def test_asyncio_close_leak(self):
- orig_count = len(IOLoop._ioloop_for_asyncio)
- for i in range(10):
- # Create and close an AsyncIOMainLoop using asyncio interfaces.
- loop = asyncio.new_event_loop()
- loop.call_soon(IOLoop.current)
- loop.call_soon(loop.stop)
- loop.run_forever()
- loop.close()
- new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
- # Because the cleanup is run on new loop creation, we have one
- # dangling entry in the map (but only one).
- self.assertEqual(new_count, 1)
- @unittest.skipIf(asyncio is None, "asyncio module not present")
- class AnyThreadEventLoopPolicyTest(unittest.TestCase):
- def setUp(self):
- self.orig_policy = asyncio.get_event_loop_policy()
- self.executor = ThreadPoolExecutor(1)
- def tearDown(self):
- asyncio.set_event_loop_policy(self.orig_policy)
- self.executor.shutdown()
- def get_event_loop_on_thread(self):
- def get_and_close_event_loop():
- """Get the event loop. Close it if one is returned.
- Returns the (closed) event loop. This is a silly thing
- to do and leaves the thread in a broken state, but it's
- enough for this test. Closing the loop avoids resource
- leak warnings.
- """
- loop = asyncio.get_event_loop()
- loop.close()
- return loop
- future = self.executor.submit(get_and_close_event_loop)
- return future.result()
- def run_policy_test(self, accessor, expected_type):
- # With the default policy, non-main threads don't get an event
- # loop.
- self.assertRaises((RuntimeError, AssertionError),
- self.executor.submit(accessor).result)
- # Set the policy and we can get a loop.
- asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
- self.assertIsInstance(
- self.executor.submit(accessor).result(),
- expected_type)
- # Clean up to silence leak warnings. Always use asyncio since
- # IOLoop doesn't (currently) close the underlying loop.
- self.executor.submit(lambda: asyncio.get_event_loop().close()).result()
- def test_asyncio_accessor(self):
- self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
- def test_tornado_accessor(self):
- self.run_policy_test(IOLoop.current, IOLoop)
|