asyncio_test.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206
  1. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  2. # not use this file except in compliance with the License. You may obtain
  3. # a copy of the License at
  4. #
  5. # http://www.apache.org/licenses/LICENSE-2.0
  6. #
  7. # Unless required by applicable law or agreed to in writing, software
  8. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  9. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  10. # License for the specific language governing permissions and limitations
  11. # under the License.
  12. from __future__ import absolute_import, division, print_function
  13. from concurrent.futures import ThreadPoolExecutor
  14. from tornado import gen
  15. from tornado.ioloop import IOLoop
  16. from tornado.testing import AsyncTestCase, gen_test
  17. from tornado.test.util import unittest, skipBefore33, skipBefore35, exec_test
  18. try:
  19. from tornado.platform.asyncio import asyncio
  20. except ImportError:
  21. asyncio = None
  22. else:
  23. from tornado.platform.asyncio import AsyncIOLoop, to_asyncio_future, AnyThreadEventLoopPolicy
  24. # This is used in dynamically-evaluated code, so silence pyflakes.
  25. to_asyncio_future
  26. @unittest.skipIf(asyncio is None, "asyncio module not present")
  27. class AsyncIOLoopTest(AsyncTestCase):
  28. def get_new_ioloop(self):
  29. io_loop = AsyncIOLoop()
  30. return io_loop
  31. def test_asyncio_callback(self):
  32. # Basic test that the asyncio loop is set up correctly.
  33. asyncio.get_event_loop().call_soon(self.stop)
  34. self.wait()
  35. @gen_test
  36. def test_asyncio_future(self):
  37. # Test that we can yield an asyncio future from a tornado coroutine.
  38. # Without 'yield from', we must wrap coroutines in ensure_future,
  39. # which was introduced during Python 3.4, deprecating the prior "async".
  40. if hasattr(asyncio, 'ensure_future'):
  41. ensure_future = asyncio.ensure_future
  42. else:
  43. # async is a reserved word in Python 3.7
  44. ensure_future = getattr(asyncio, 'async')
  45. x = yield ensure_future(
  46. asyncio.get_event_loop().run_in_executor(None, lambda: 42))
  47. self.assertEqual(x, 42)
  48. @skipBefore33
  49. @gen_test
  50. def test_asyncio_yield_from(self):
  51. # Test that we can use asyncio coroutines with 'yield from'
  52. # instead of asyncio.async(). This requires python 3.3 syntax.
  53. namespace = exec_test(globals(), locals(), """
  54. @gen.coroutine
  55. def f():
  56. event_loop = asyncio.get_event_loop()
  57. x = yield from event_loop.run_in_executor(None, lambda: 42)
  58. return x
  59. """)
  60. result = yield namespace['f']()
  61. self.assertEqual(result, 42)
  62. @skipBefore35
  63. def test_asyncio_adapter(self):
  64. # This test demonstrates that when using the asyncio coroutine
  65. # runner (i.e. run_until_complete), the to_asyncio_future
  66. # adapter is needed. No adapter is needed in the other direction,
  67. # as demonstrated by other tests in the package.
  68. @gen.coroutine
  69. def tornado_coroutine():
  70. yield gen.moment
  71. raise gen.Return(42)
  72. native_coroutine_without_adapter = exec_test(globals(), locals(), """
  73. async def native_coroutine_without_adapter():
  74. return await tornado_coroutine()
  75. """)["native_coroutine_without_adapter"]
  76. native_coroutine_with_adapter = exec_test(globals(), locals(), """
  77. async def native_coroutine_with_adapter():
  78. return await to_asyncio_future(tornado_coroutine())
  79. """)["native_coroutine_with_adapter"]
  80. # Use the adapter, but two degrees from the tornado coroutine.
  81. native_coroutine_with_adapter2 = exec_test(globals(), locals(), """
  82. async def native_coroutine_with_adapter2():
  83. return await to_asyncio_future(native_coroutine_without_adapter())
  84. """)["native_coroutine_with_adapter2"]
  85. # Tornado supports native coroutines both with and without adapters
  86. self.assertEqual(
  87. self.io_loop.run_sync(native_coroutine_without_adapter),
  88. 42)
  89. self.assertEqual(
  90. self.io_loop.run_sync(native_coroutine_with_adapter),
  91. 42)
  92. self.assertEqual(
  93. self.io_loop.run_sync(native_coroutine_with_adapter2),
  94. 42)
  95. # Asyncio only supports coroutines that yield asyncio-compatible
  96. # Futures (which our Future is since 5.0).
  97. self.assertEqual(
  98. asyncio.get_event_loop().run_until_complete(
  99. native_coroutine_without_adapter()),
  100. 42)
  101. self.assertEqual(
  102. asyncio.get_event_loop().run_until_complete(
  103. native_coroutine_with_adapter()),
  104. 42)
  105. self.assertEqual(
  106. asyncio.get_event_loop().run_until_complete(
  107. native_coroutine_with_adapter2()),
  108. 42)
  109. @unittest.skipIf(asyncio is None, "asyncio module not present")
  110. class LeakTest(unittest.TestCase):
  111. def setUp(self):
  112. # Trigger a cleanup of the mapping so we start with a clean slate.
  113. AsyncIOLoop().close()
  114. # If we don't clean up after ourselves other tests may fail on
  115. # py34.
  116. self.orig_policy = asyncio.get_event_loop_policy()
  117. asyncio.set_event_loop_policy(asyncio.DefaultEventLoopPolicy())
  118. def tearDown(self):
  119. asyncio.get_event_loop().close()
  120. asyncio.set_event_loop_policy(self.orig_policy)
  121. def test_ioloop_close_leak(self):
  122. orig_count = len(IOLoop._ioloop_for_asyncio)
  123. for i in range(10):
  124. # Create and close an AsyncIOLoop using Tornado interfaces.
  125. loop = AsyncIOLoop()
  126. loop.close()
  127. new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
  128. self.assertEqual(new_count, 0)
  129. def test_asyncio_close_leak(self):
  130. orig_count = len(IOLoop._ioloop_for_asyncio)
  131. for i in range(10):
  132. # Create and close an AsyncIOMainLoop using asyncio interfaces.
  133. loop = asyncio.new_event_loop()
  134. loop.call_soon(IOLoop.current)
  135. loop.call_soon(loop.stop)
  136. loop.run_forever()
  137. loop.close()
  138. new_count = len(IOLoop._ioloop_for_asyncio) - orig_count
  139. # Because the cleanup is run on new loop creation, we have one
  140. # dangling entry in the map (but only one).
  141. self.assertEqual(new_count, 1)
  142. @unittest.skipIf(asyncio is None, "asyncio module not present")
  143. class AnyThreadEventLoopPolicyTest(unittest.TestCase):
  144. def setUp(self):
  145. self.orig_policy = asyncio.get_event_loop_policy()
  146. self.executor = ThreadPoolExecutor(1)
  147. def tearDown(self):
  148. asyncio.set_event_loop_policy(self.orig_policy)
  149. self.executor.shutdown()
  150. def get_event_loop_on_thread(self):
  151. def get_and_close_event_loop():
  152. """Get the event loop. Close it if one is returned.
  153. Returns the (closed) event loop. This is a silly thing
  154. to do and leaves the thread in a broken state, but it's
  155. enough for this test. Closing the loop avoids resource
  156. leak warnings.
  157. """
  158. loop = asyncio.get_event_loop()
  159. loop.close()
  160. return loop
  161. future = self.executor.submit(get_and_close_event_loop)
  162. return future.result()
  163. def run_policy_test(self, accessor, expected_type):
  164. # With the default policy, non-main threads don't get an event
  165. # loop.
  166. self.assertRaises((RuntimeError, AssertionError),
  167. self.executor.submit(accessor).result)
  168. # Set the policy and we can get a loop.
  169. asyncio.set_event_loop_policy(AnyThreadEventLoopPolicy())
  170. self.assertIsInstance(
  171. self.executor.submit(accessor).result(),
  172. expected_type)
  173. # Clean up to silence leak warnings. Always use asyncio since
  174. # IOLoop doesn't (currently) close the underlying loop.
  175. self.executor.submit(lambda: asyncio.get_event_loop().close()).result()
  176. def test_asyncio_accessor(self):
  177. self.run_policy_test(asyncio.get_event_loop, asyncio.AbstractEventLoop)
  178. def test_tornado_accessor(self):
  179. self.run_policy_test(IOLoop.current, IOLoop)