123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729 |
- # Author: Ovidiu Predescu
- # Date: July 2011
- #
- # Licensed under the Apache License, Version 2.0 (the "License"); you may
- # not use this file except in compliance with the License. You may obtain
- # a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- # License for the specific language governing permissions and limitations
- # under the License.
- """
- Unittest for the twisted-style reactor.
- """
- from __future__ import absolute_import, division, print_function
- import logging
- import os
- import shutil
- import signal
- import sys
- import tempfile
- import threading
- import warnings
- from tornado.escape import utf8
- from tornado import gen
- from tornado.httpclient import AsyncHTTPClient
- from tornado.httpserver import HTTPServer
- from tornado.ioloop import IOLoop, PollIOLoop
- from tornado.platform.auto import set_close_exec
- from tornado.testing import bind_unused_port
- from tornado.test.util import unittest
- from tornado.util import import_object, PY3
- from tornado.web import RequestHandler, Application
- try:
- import fcntl
- from twisted.internet.defer import Deferred, inlineCallbacks, returnValue # type: ignore
- from twisted.internet.interfaces import IReadDescriptor, IWriteDescriptor # type: ignore
- from twisted.internet.protocol import Protocol # type: ignore
- from twisted.python import log # type: ignore
- from tornado.platform.twisted import TornadoReactor, TwistedIOLoop
- from zope.interface import implementer # type: ignore
- have_twisted = True
- except ImportError:
- have_twisted = False
- # The core of Twisted 12.3.0 is available on python 3, but twisted.web is not
- # so test for it separately.
- try:
- from twisted.web.client import Agent, readBody # type: ignore
- from twisted.web.resource import Resource # type: ignore
- from twisted.web.server import Site # type: ignore
- # As of Twisted 15.0.0, twisted.web is present but fails our
- # tests due to internal str/bytes errors.
- have_twisted_web = sys.version_info < (3,)
- except ImportError:
- have_twisted_web = False
- if PY3:
- import _thread as thread
- else:
- import thread
- ResourceWarning = None
- try:
- import asyncio
- except ImportError:
- asyncio = None
- skipIfNoTwisted = unittest.skipUnless(have_twisted,
- "twisted module not present")
- def save_signal_handlers():
- saved = {}
- for sig in [signal.SIGINT, signal.SIGTERM, signal.SIGCHLD]:
- saved[sig] = signal.getsignal(sig)
- if "twisted" in repr(saved):
- if not issubclass(IOLoop.configured_class(), TwistedIOLoop):
- # when the global ioloop is twisted, we expect the signal
- # handlers to be installed. Otherwise, it means we're not
- # cleaning up after twisted properly.
- raise Exception("twisted signal handlers already installed")
- return saved
- def restore_signal_handlers(saved):
- for sig, handler in saved.items():
- signal.signal(sig, handler)
- class ReactorTestCase(unittest.TestCase):
- def setUp(self):
- self._saved_signals = save_signal_handlers()
- IOLoop.clear_current()
- self._io_loop = IOLoop(make_current=True)
- self._reactor = TornadoReactor()
- IOLoop.clear_current()
- def tearDown(self):
- self._io_loop.close(all_fds=True)
- restore_signal_handlers(self._saved_signals)
- @skipIfNoTwisted
- class ReactorWhenRunningTest(ReactorTestCase):
- def test_whenRunning(self):
- self._whenRunningCalled = False
- self._anotherWhenRunningCalled = False
- self._reactor.callWhenRunning(self.whenRunningCallback)
- self._reactor.run()
- self.assertTrue(self._whenRunningCalled)
- self.assertTrue(self._anotherWhenRunningCalled)
- def whenRunningCallback(self):
- self._whenRunningCalled = True
- self._reactor.callWhenRunning(self.anotherWhenRunningCallback)
- self._reactor.stop()
- def anotherWhenRunningCallback(self):
- self._anotherWhenRunningCalled = True
- @skipIfNoTwisted
- class ReactorCallLaterTest(ReactorTestCase):
- def test_callLater(self):
- self._laterCalled = False
- self._now = self._reactor.seconds()
- self._timeout = 0.001
- dc = self._reactor.callLater(self._timeout, self.callLaterCallback)
- self.assertEqual(self._reactor.getDelayedCalls(), [dc])
- self._reactor.run()
- self.assertTrue(self._laterCalled)
- self.assertTrue(self._called - self._now > self._timeout)
- self.assertEqual(self._reactor.getDelayedCalls(), [])
- def callLaterCallback(self):
- self._laterCalled = True
- self._called = self._reactor.seconds()
- self._reactor.stop()
- @skipIfNoTwisted
- class ReactorTwoCallLaterTest(ReactorTestCase):
- def test_callLater(self):
- self._later1Called = False
- self._later2Called = False
- self._now = self._reactor.seconds()
- self._timeout1 = 0.0005
- dc1 = self._reactor.callLater(self._timeout1, self.callLaterCallback1)
- self._timeout2 = 0.001
- dc2 = self._reactor.callLater(self._timeout2, self.callLaterCallback2)
- self.assertTrue(self._reactor.getDelayedCalls() == [dc1, dc2] or
- self._reactor.getDelayedCalls() == [dc2, dc1])
- self._reactor.run()
- self.assertTrue(self._later1Called)
- self.assertTrue(self._later2Called)
- self.assertTrue(self._called1 - self._now > self._timeout1)
- self.assertTrue(self._called2 - self._now > self._timeout2)
- self.assertEqual(self._reactor.getDelayedCalls(), [])
- def callLaterCallback1(self):
- self._later1Called = True
- self._called1 = self._reactor.seconds()
- def callLaterCallback2(self):
- self._later2Called = True
- self._called2 = self._reactor.seconds()
- self._reactor.stop()
- @skipIfNoTwisted
- class ReactorCallFromThreadTest(ReactorTestCase):
- def setUp(self):
- super(ReactorCallFromThreadTest, self).setUp()
- self._mainThread = thread.get_ident()
- def tearDown(self):
- self._thread.join()
- super(ReactorCallFromThreadTest, self).tearDown()
- def _newThreadRun(self):
- self.assertNotEqual(self._mainThread, thread.get_ident())
- if hasattr(self._thread, 'ident'): # new in python 2.6
- self.assertEqual(self._thread.ident, thread.get_ident())
- self._reactor.callFromThread(self._fnCalledFromThread)
- def _fnCalledFromThread(self):
- self.assertEqual(self._mainThread, thread.get_ident())
- self._reactor.stop()
- def _whenRunningCallback(self):
- self._thread = threading.Thread(target=self._newThreadRun)
- self._thread.start()
- def testCallFromThread(self):
- self._reactor.callWhenRunning(self._whenRunningCallback)
- self._reactor.run()
- @skipIfNoTwisted
- class ReactorCallInThread(ReactorTestCase):
- def setUp(self):
- super(ReactorCallInThread, self).setUp()
- self._mainThread = thread.get_ident()
- def _fnCalledInThread(self, *args, **kwargs):
- self.assertNotEqual(thread.get_ident(), self._mainThread)
- self._reactor.callFromThread(lambda: self._reactor.stop())
- def _whenRunningCallback(self):
- self._reactor.callInThread(self._fnCalledInThread)
- def testCallInThread(self):
- self._reactor.callWhenRunning(self._whenRunningCallback)
- self._reactor.run()
- if have_twisted:
- @implementer(IReadDescriptor)
- class Reader(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
- def logPrefix(self):
- return "Reader"
- def close(self):
- self._fd.close()
- def fileno(self):
- return self._fd.fileno()
- def readConnectionLost(self, reason):
- self.close()
- def connectionLost(self, reason):
- self.close()
- def doRead(self):
- self._callback(self._fd)
- @implementer(IWriteDescriptor)
- class Writer(object):
- def __init__(self, fd, callback):
- self._fd = fd
- self._callback = callback
- def logPrefix(self):
- return "Writer"
- def close(self):
- self._fd.close()
- def fileno(self):
- return self._fd.fileno()
- def connectionLost(self, reason):
- self.close()
- def doWrite(self):
- self._callback(self._fd)
- @skipIfNoTwisted
- class ReactorReaderWriterTest(ReactorTestCase):
- def _set_nonblocking(self, fd):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL)
- fcntl.fcntl(fd, fcntl.F_SETFL, flags | os.O_NONBLOCK)
- def setUp(self):
- super(ReactorReaderWriterTest, self).setUp()
- r, w = os.pipe()
- self._set_nonblocking(r)
- self._set_nonblocking(w)
- set_close_exec(r)
- set_close_exec(w)
- self._p1 = os.fdopen(r, "rb", 0)
- self._p2 = os.fdopen(w, "wb", 0)
- def tearDown(self):
- super(ReactorReaderWriterTest, self).tearDown()
- self._p1.close()
- self._p2.close()
- def _testReadWrite(self):
- """
- In this test the writer writes an 'x' to its fd. The reader
- reads it, check the value and ends the test.
- """
- self.shouldWrite = True
- def checkReadInput(fd):
- self.assertEquals(fd.read(1), b'x')
- self._reactor.stop()
- def writeOnce(fd):
- if self.shouldWrite:
- self.shouldWrite = False
- fd.write(b'x')
- self._reader = Reader(self._p1, checkReadInput)
- self._writer = Writer(self._p2, writeOnce)
- self._reactor.addWriter(self._writer)
- # Test that adding the reader twice adds it only once to
- # IOLoop.
- self._reactor.addReader(self._reader)
- self._reactor.addReader(self._reader)
- def testReadWrite(self):
- self._reactor.callWhenRunning(self._testReadWrite)
- self._reactor.run()
- def _testNoWriter(self):
- """
- In this test we have no writer. Make sure the reader doesn't
- read anything.
- """
- def checkReadInput(fd):
- self.fail("Must not be called.")
- def stopTest():
- # Close the writer here since the IOLoop doesn't know
- # about it.
- self._writer.close()
- self._reactor.stop()
- self._reader = Reader(self._p1, checkReadInput)
- # We create a writer, but it should never be invoked.
- self._writer = Writer(self._p2, lambda fd: fd.write('x'))
- # Test that adding and removing the writer leaves us with no writer.
- self._reactor.addWriter(self._writer)
- self._reactor.removeWriter(self._writer)
- # Test that adding and removing the reader doesn't cause
- # unintended effects.
- self._reactor.addReader(self._reader)
- # Wake up after a moment and stop the test
- self._reactor.callLater(0.001, stopTest)
- def testNoWriter(self):
- self._reactor.callWhenRunning(self._testNoWriter)
- self._reactor.run()
- # Test various combinations of twisted and tornado http servers,
- # http clients, and event loop interfaces.
- @skipIfNoTwisted
- @unittest.skipIf(not have_twisted_web, 'twisted web not present')
- class CompatibilityTests(unittest.TestCase):
- def setUp(self):
- self.saved_signals = save_signal_handlers()
- self.io_loop = IOLoop()
- self.io_loop.make_current()
- self.reactor = TornadoReactor()
- def tearDown(self):
- self.reactor.disconnectAll()
- self.io_loop.clear_current()
- self.io_loop.close(all_fds=True)
- restore_signal_handlers(self.saved_signals)
- def start_twisted_server(self):
- class HelloResource(Resource):
- isLeaf = True
- def render_GET(self, request):
- return "Hello from twisted!"
- site = Site(HelloResource())
- port = self.reactor.listenTCP(0, site, interface='127.0.0.1')
- self.twisted_port = port.getHost().port
- def start_tornado_server(self):
- class HelloHandler(RequestHandler):
- def get(self):
- self.write("Hello from tornado!")
- app = Application([('/', HelloHandler)],
- log_function=lambda x: None)
- server = HTTPServer(app)
- sock, self.tornado_port = bind_unused_port()
- server.add_sockets([sock])
- def run_ioloop(self):
- self.stop_loop = self.io_loop.stop
- self.io_loop.start()
- self.reactor.fireSystemEvent('shutdown')
- def run_reactor(self):
- self.stop_loop = self.reactor.stop
- self.stop = self.reactor.stop
- self.reactor.run()
- def tornado_fetch(self, url, runner):
- responses = []
- client = AsyncHTTPClient()
- def callback(response):
- responses.append(response)
- self.stop_loop()
- client.fetch(url, callback=callback)
- runner()
- self.assertEqual(len(responses), 1)
- responses[0].rethrow()
- return responses[0]
- def twisted_fetch(self, url, runner):
- # http://twistedmatrix.com/documents/current/web/howto/client.html
- chunks = []
- client = Agent(self.reactor)
- d = client.request(b'GET', utf8(url))
- class Accumulator(Protocol):
- def __init__(self, finished):
- self.finished = finished
- def dataReceived(self, data):
- chunks.append(data)
- def connectionLost(self, reason):
- self.finished.callback(None)
- def callback(response):
- finished = Deferred()
- response.deliverBody(Accumulator(finished))
- return finished
- d.addCallback(callback)
- def shutdown(failure):
- if hasattr(self, 'stop_loop'):
- self.stop_loop()
- elif failure is not None:
- # loop hasn't been initialized yet; try our best to
- # get an error message out. (the runner() interaction
- # should probably be refactored).
- try:
- failure.raiseException()
- except:
- logging.error('exception before starting loop', exc_info=True)
- d.addBoth(shutdown)
- runner()
- self.assertTrue(chunks)
- return ''.join(chunks)
- def twisted_coroutine_fetch(self, url, runner):
- body = [None]
- @gen.coroutine
- def f():
- # This is simpler than the non-coroutine version, but it cheats
- # by reading the body in one blob instead of streaming it with
- # a Protocol.
- client = Agent(self.reactor)
- response = yield client.request(b'GET', utf8(url))
- with warnings.catch_warnings():
- # readBody has a buggy DeprecationWarning in Twisted 15.0:
- # https://twistedmatrix.com/trac/changeset/43379
- warnings.simplefilter('ignore', category=DeprecationWarning)
- body[0] = yield readBody(response)
- self.stop_loop()
- self.io_loop.add_callback(f)
- runner()
- return body[0]
- def testTwistedServerTornadoClientIOLoop(self):
- self.start_twisted_server()
- response = self.tornado_fetch(
- 'http://127.0.0.1:%d' % self.twisted_port, self.run_ioloop)
- self.assertEqual(response.body, 'Hello from twisted!')
- def testTwistedServerTornadoClientReactor(self):
- self.start_twisted_server()
- response = self.tornado_fetch(
- 'http://127.0.0.1:%d' % self.twisted_port, self.run_reactor)
- self.assertEqual(response.body, 'Hello from twisted!')
- def testTornadoServerTwistedClientIOLoop(self):
- self.start_tornado_server()
- response = self.twisted_fetch(
- 'http://127.0.0.1:%d' % self.tornado_port, self.run_ioloop)
- self.assertEqual(response, 'Hello from tornado!')
- def testTornadoServerTwistedClientReactor(self):
- self.start_tornado_server()
- response = self.twisted_fetch(
- 'http://127.0.0.1:%d' % self.tornado_port, self.run_reactor)
- self.assertEqual(response, 'Hello from tornado!')
- def testTornadoServerTwistedCoroutineClientIOLoop(self):
- self.start_tornado_server()
- response = self.twisted_coroutine_fetch(
- 'http://127.0.0.1:%d' % self.tornado_port, self.run_ioloop)
- self.assertEqual(response, 'Hello from tornado!')
- @skipIfNoTwisted
- class ConvertDeferredTest(unittest.TestCase):
- def test_success(self):
- @inlineCallbacks
- def fn():
- if False:
- # inlineCallbacks doesn't work with regular functions;
- # must have a yield even if it's unreachable.
- yield
- returnValue(42)
- f = gen.convert_yielded(fn())
- self.assertEqual(f.result(), 42)
- def test_failure(self):
- @inlineCallbacks
- def fn():
- if False:
- yield
- 1 / 0
- f = gen.convert_yielded(fn())
- with self.assertRaises(ZeroDivisionError):
- f.result()
- if have_twisted:
- # Import and run as much of twisted's test suite as possible.
- # This is unfortunately rather dependent on implementation details,
- # but there doesn't appear to be a clean all-in-one conformance test
- # suite for reactors.
- #
- # This is a list of all test suites using the ReactorBuilder
- # available in Twisted 11.0.0 and 11.1.0 (and a blacklist of
- # specific test methods to be disabled).
- twisted_tests = {
- 'twisted.internet.test.test_core.ObjectModelIntegrationTest': [],
- 'twisted.internet.test.test_core.SystemEventTestsBuilder': [
- 'test_iterate', # deliberately not supported
- # Fails on TwistedIOLoop and AsyncIOLoop.
- 'test_runAfterCrash',
- ],
- 'twisted.internet.test.test_fdset.ReactorFDSetTestsBuilder': [
- "test_lostFileDescriptor", # incompatible with epoll and kqueue
- ],
- 'twisted.internet.test.test_process.ProcessTestsBuilder': [
- # Only work as root. Twisted's "skip" functionality works
- # with py27+, but not unittest2 on py26.
- 'test_changeGID',
- 'test_changeUID',
- # This test sometimes fails with EPIPE on a call to
- # kqueue.control. Happens consistently for me with
- # trollius but not asyncio or other IOLoops.
- 'test_childConnectionLost',
- ],
- # Process tests appear to work on OSX 10.7, but not 10.6
- # 'twisted.internet.test.test_process.PTYProcessTestsBuilder': [
- # 'test_systemCallUninterruptedByChildExit',
- # ],
- 'twisted.internet.test.test_tcp.TCPClientTestsBuilder': [
- 'test_badContext', # ssl-related; see also SSLClientTestsMixin
- ],
- 'twisted.internet.test.test_tcp.TCPPortTestsBuilder': [
- # These use link-local addresses and cause firewall prompts on mac
- 'test_buildProtocolIPv6AddressScopeID',
- 'test_portGetHostOnIPv6ScopeID',
- 'test_serverGetHostOnIPv6ScopeID',
- 'test_serverGetPeerOnIPv6ScopeID',
- ],
- 'twisted.internet.test.test_tcp.TCPConnectionTestsBuilder': [],
- 'twisted.internet.test.test_tcp.WriteSequenceTests': [],
- 'twisted.internet.test.test_tcp.AbortConnectionTestCase': [],
- 'twisted.internet.test.test_threads.ThreadTestsBuilder': [],
- 'twisted.internet.test.test_time.TimeTestsBuilder': [],
- # Extra third-party dependencies (pyOpenSSL)
- # 'twisted.internet.test.test_tls.SSLClientTestsMixin': [],
- 'twisted.internet.test.test_udp.UDPServerTestsBuilder': [],
- 'twisted.internet.test.test_unix.UNIXTestsBuilder': [
- # Platform-specific. These tests would be skipped automatically
- # if we were running twisted's own test runner.
- 'test_connectToLinuxAbstractNamespace',
- 'test_listenOnLinuxAbstractNamespace',
- # These tests use twisted's sendmsg.c extension and sometimes
- # fail with what looks like uninitialized memory errors
- # (more common on pypy than cpython, but I've seen it on both)
- 'test_sendFileDescriptor',
- 'test_sendFileDescriptorTriggersPauseProducing',
- 'test_descriptorDeliveredBeforeBytes',
- 'test_avoidLeakingFileDescriptors',
- ],
- 'twisted.internet.test.test_unix.UNIXDatagramTestsBuilder': [
- 'test_listenOnLinuxAbstractNamespace',
- ],
- 'twisted.internet.test.test_unix.UNIXPortTestsBuilder': [],
- }
- if sys.version_info >= (3,):
- # In Twisted 15.2.0 on Python 3.4, the process tests will try to run
- # but fail, due in part to interactions between Tornado's strict
- # warnings-as-errors policy and Twisted's own warning handling
- # (it was not obvious how to configure the warnings module to
- # reconcile the two), and partly due to what looks like a packaging
- # error (process_cli.py missing). For now, just skip it.
- del twisted_tests['twisted.internet.test.test_process.ProcessTestsBuilder']
- for test_name, blacklist in twisted_tests.items():
- try:
- test_class = import_object(test_name)
- except (ImportError, AttributeError):
- continue
- for test_func in blacklist: # type: ignore
- if hasattr(test_class, test_func):
- # The test_func may be defined in a mixin, so clobber
- # it instead of delattr()
- setattr(test_class, test_func, lambda self: None)
- def make_test_subclass(test_class):
- class TornadoTest(test_class): # type: ignore
- _reactors = ["tornado.platform.twisted._TestReactor"]
- def setUp(self):
- # Twisted's tests expect to be run from a temporary
- # directory; they create files in their working directory
- # and don't always clean up after themselves.
- self.__curdir = os.getcwd()
- self.__tempdir = tempfile.mkdtemp()
- os.chdir(self.__tempdir)
- super(TornadoTest, self).setUp() # type: ignore
- def tearDown(self):
- super(TornadoTest, self).tearDown() # type: ignore
- os.chdir(self.__curdir)
- shutil.rmtree(self.__tempdir)
- def flushWarnings(self, *args, **kwargs):
- # This is a hack because Twisted and Tornado have
- # differing approaches to warnings in tests.
- # Tornado sets up a global set of warnings filters
- # in runtests.py, while Twisted patches the filter
- # list in each test. The net effect is that
- # Twisted's tests run with Tornado's increased
- # strictness (BytesWarning and ResourceWarning are
- # enabled) but without our filter rules to ignore those
- # warnings from Twisted code.
- filtered = []
- for w in super(TornadoTest, self).flushWarnings( # type: ignore
- *args, **kwargs):
- if w['category'] in (BytesWarning, ResourceWarning):
- continue
- filtered.append(w)
- return filtered
- def buildReactor(self):
- self.__saved_signals = save_signal_handlers()
- return test_class.buildReactor(self)
- def unbuildReactor(self, reactor):
- test_class.unbuildReactor(self, reactor)
- # Clean up file descriptors (especially epoll/kqueue
- # objects) eagerly instead of leaving them for the
- # GC. Unfortunately we can't do this in reactor.stop
- # since twisted expects to be able to unregister
- # connections in a post-shutdown hook.
- reactor._io_loop.close(all_fds=True)
- restore_signal_handlers(self.__saved_signals)
- TornadoTest.__name__ = test_class.__name__
- return TornadoTest
- test_subclass = make_test_subclass(test_class)
- globals().update(test_subclass.makeTestCaseClasses())
- # Since we're not using twisted's test runner, it's tricky to get
- # logging set up well. Most of the time it's easiest to just
- # leave it turned off, but while working on these tests you may want
- # to uncomment one of the other lines instead.
- log.defaultObserver.stop()
- # import sys; log.startLogging(sys.stderr, setStdout=0)
- # log.startLoggingWithObserver(log.PythonLoggingObserver().emit, setStdout=0)
- # import logging; logging.getLogger('twisted').setLevel(logging.WARNING)
- # Twisted recently introduced a new logger; disable that one too.
- try:
- from twisted.logger import globalLogBeginner # type: ignore
- except ImportError:
- pass
- else:
- globalLogBeginner.beginLoggingTo([], redirectStandardIO=False)
- if have_twisted:
- class LayeredTwistedIOLoop(TwistedIOLoop):
- """Layers a TwistedIOLoop on top of a TornadoReactor on a PollIOLoop.
- This is of course silly, but is useful for testing purposes to make
- sure we're implementing both sides of the various interfaces
- correctly. In some tests another TornadoReactor is layered on top
- of the whole stack.
- """
- def initialize(self, **kwargs):
- self.real_io_loop = PollIOLoop(make_current=False) # type: ignore
- reactor = self.real_io_loop.run_sync(gen.coroutine(TornadoReactor))
- super(LayeredTwistedIOLoop, self).initialize(reactor=reactor, **kwargs)
- self.add_callback(self.make_current)
- def close(self, all_fds=False):
- super(LayeredTwistedIOLoop, self).close(all_fds=all_fds)
- # HACK: This is the same thing that test_class.unbuildReactor does.
- for reader in self.reactor._internalReaders:
- self.reactor.removeReader(reader)
- reader.connectionLost(None)
- self.real_io_loop.close(all_fds=all_fds)
- def stop(self):
- # One of twisted's tests fails if I don't delay crash()
- # until the reactor has started, but if I move this to
- # TwistedIOLoop then the tests fail when I'm *not* running
- # tornado-on-twisted-on-tornado. I'm clearly missing something
- # about the startup/crash semantics, but since stop and crash
- # are really only used in tests it doesn't really matter.
- def f():
- self.reactor.crash()
- # Become current again on restart. This is needed to
- # override real_io_loop's claim to being the current loop.
- self.add_callback(self.make_current)
- self.reactor.callWhenRunning(f)
- if __name__ == "__main__":
- unittest.main()
|