123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for implementations of L{IReactorThreads}.
- """
- from __future__ import division, absolute_import
- __metaclass__ = type
- from weakref import ref
- import gc, threading
- from twisted.python.threadable import isInIOThread
- from twisted.internet.test.reactormixins import ReactorBuilder
- from twisted.python.threadpool import ThreadPool
- from twisted.internet.interfaces import IReactorThreads
- class ThreadTestsBuilder(ReactorBuilder):
- """
- Builder for defining tests relating to L{IReactorThreads}.
- """
- requiredInterfaces = (IReactorThreads,)
- def test_getThreadPool(self):
- """
- C{reactor.getThreadPool()} returns an instance of L{ThreadPool} which
- starts when C{reactor.run()} is called and stops before it returns.
- """
- state = []
- reactor = self.buildReactor()
- pool = reactor.getThreadPool()
- self.assertIsInstance(pool, ThreadPool)
- self.assertFalse(
- pool.started, "Pool should not start before reactor.run")
- def f():
- # Record the state for later assertions
- state.append(pool.started)
- state.append(pool.joined)
- reactor.stop()
- reactor.callWhenRunning(f)
- self.runReactor(reactor, 2)
- self.assertTrue(
- state[0], "Pool should start after reactor.run")
- self.assertFalse(
- state[1], "Pool should not be joined before reactor.stop")
- self.assertTrue(
- pool.joined,
- "Pool should be stopped after reactor.run returns")
- def test_suggestThreadPoolSize(self):
- """
- C{reactor.suggestThreadPoolSize()} sets the maximum size of the reactor
- threadpool.
- """
- reactor = self.buildReactor()
- reactor.suggestThreadPoolSize(17)
- pool = reactor.getThreadPool()
- self.assertEqual(pool.max, 17)
- def test_delayedCallFromThread(self):
- """
- A function scheduled with L{IReactorThreads.callFromThread} invoked
- from a delayed call is run immediately in the next reactor iteration.
- When invoked from the reactor thread, previous implementations of
- L{IReactorThreads.callFromThread} would skip the pipe/socket based wake
- up step, assuming the reactor would wake up on its own. However, this
- resulted in the reactor not noticing an insert into the thread queue at
- the right time (in this case, after the thread queue has been processed
- for that reactor iteration).
- """
- reactor = self.buildReactor()
- def threadCall():
- reactor.stop()
- # Set up the use of callFromThread being tested.
- reactor.callLater(0, reactor.callFromThread, threadCall)
- before = reactor.seconds()
- self.runReactor(reactor, 60)
- after = reactor.seconds()
- # We specified a timeout of 60 seconds. The timeout code in runReactor
- # probably won't actually work, though. If the reactor comes out of
- # the event notification API just a little bit early, say after 59.9999
- # seconds instead of after 60 seconds, then the queued thread call will
- # get processed but the timeout delayed call runReactor sets up won't!
- # Then the reactor will stop and runReactor will return without the
- # timeout firing. As it turns out, select() and poll() are quite
- # likely to return *slightly* earlier than we ask them to, so the
- # timeout will rarely happen, even if callFromThread is broken. So,
- # instead we'll measure the elapsed time and make sure it's something
- # less than about half of the timeout we specified. This is heuristic.
- # It assumes that select() won't ever return after 30 seconds when we
- # asked it to timeout after 60 seconds. And of course like all
- # time-based tests, it's slightly non-deterministic. If the OS doesn't
- # schedule this process for 30 seconds, then the test might fail even
- # if callFromThread is working.
- self.assertTrue(after - before < 30)
- def test_callFromThread(self):
- """
- A function scheduled with L{IReactorThreads.callFromThread} invoked
- from another thread is run in the reactor thread.
- """
- reactor = self.buildReactor()
- result = []
- def threadCall():
- result.append(threading.currentThread())
- reactor.stop()
- reactor.callLater(0, reactor.callInThread,
- reactor.callFromThread, threadCall)
- self.runReactor(reactor, 5)
- self.assertEqual(result, [threading.currentThread()])
- def test_stopThreadPool(self):
- """
- When the reactor stops, L{ReactorBase._stopThreadPool} drops the
- reactor's direct reference to its internal threadpool and removes
- the associated startup and shutdown triggers.
- This is the case of the thread pool being created before the reactor
- is run.
- """
- reactor = self.buildReactor()
- threadpool = ref(reactor.getThreadPool())
- reactor.callWhenRunning(reactor.stop)
- self.runReactor(reactor)
- gc.collect()
- self.assertIsNone(threadpool())
- def test_stopThreadPoolWhenStartedAfterReactorRan(self):
- """
- We must handle the case of shutting down the thread pool when it was
- started after the reactor was run in a special way.
- Some implementation background: The thread pool is started with
- callWhenRunning, which only returns a system trigger ID when it is
- invoked before the reactor is started.
- This is the case of the thread pool being created after the reactor
- is started.
- """
- reactor = self.buildReactor()
- threadPoolRefs = []
- def acquireThreadPool():
- threadPoolRefs.append(ref(reactor.getThreadPool()))
- reactor.stop()
- reactor.callWhenRunning(acquireThreadPool)
- self.runReactor(reactor)
- gc.collect()
- self.assertIsNone(threadPoolRefs[0]())
- def test_cleanUpThreadPoolEvenBeforeReactorIsRun(self):
- """
- When the reactor has its shutdown event fired before it is run, the
- thread pool is completely destroyed.
- For what it's worth, the reason we support this behavior at all is
- because Trial does this.
- This is the case of the thread pool being created without the reactor
- being started at al.
- """
- reactor = self.buildReactor()
- threadPoolRef = ref(reactor.getThreadPool())
- reactor.fireSystemEvent("shutdown")
- if reactor.__class__.__name__ == "AsyncioSelectorReactor":
- self.assertIsNone(reactor.threadpool)
- else:
- gc.collect()
- self.assertIsNone(threadPoolRef())
- def test_isInIOThread(self):
- """
- The reactor registers itself as the I/O thread when it runs so that
- L{twisted.python.threadable.isInIOThread} returns C{True} if it is
- called in the thread the reactor is running in.
- """
- results = []
- reactor = self.buildReactor()
- def check():
- results.append(isInIOThread())
- reactor.stop()
- reactor.callWhenRunning(check)
- self.runReactor(reactor)
- self.assertEqual([True], results)
- def test_isNotInIOThread(self):
- """
- The reactor registers itself as the I/O thread when it runs so that
- L{twisted.python.threadable.isInIOThread} returns C{False} if it is
- called in a different thread than the reactor is running in.
- """
- results = []
- reactor = self.buildReactor()
- def check():
- results.append(isInIOThread())
- reactor.callFromThread(reactor.stop)
- reactor.callInThread(check)
- self.runReactor(reactor)
- self.assertEqual([False], results)
- globals().update(ThreadTestsBuilder.makeTestCaseClasses())
|