123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.application} and its interaction with
- L{twisted.persisted.sob}.
- """
- from __future__ import absolute_import, division
- import copy
- import os
- import pickle
- try:
- import asyncio
- except ImportError:
- asyncio = None
- from twisted.application import service, internet, app, reactors
- from twisted.internet import interfaces, defer, protocol, reactor
- from twisted.persisted import sob
- from twisted.plugins import twisted_reactors
- from twisted.protocols import wire, basic
- from twisted.python import usage
- from twisted.python.compat import NativeStringIO
- from twisted.python.runtime import platformType
- from twisted.python.test.modules_helpers import TwistedModulesMixin
- from twisted.test.proto_helpers import MemoryReactor
- from twisted.trial import unittest
- class Dummy:
- processName=None
- class ServiceTests(unittest.TestCase):
- def testName(self):
- s = service.Service()
- s.setName("hello")
- self.assertEqual(s.name, "hello")
- def testParent(self):
- s = service.Service()
- p = service.MultiService()
- s.setServiceParent(p)
- self.assertEqual(list(p), [s])
- self.assertEqual(s.parent, p)
- def testApplicationAsParent(self):
- s = service.Service()
- p = service.Application("")
- s.setServiceParent(p)
- self.assertEqual(list(service.IServiceCollection(p)), [s])
- self.assertEqual(s.parent, service.IServiceCollection(p))
- def testNamedChild(self):
- s = service.Service()
- p = service.MultiService()
- s.setName("hello")
- s.setServiceParent(p)
- self.assertEqual(list(p), [s])
- self.assertEqual(s.parent, p)
- self.assertEqual(p.getServiceNamed("hello"), s)
- def testDoublyNamedChild(self):
- s = service.Service()
- p = service.MultiService()
- s.setName("hello")
- s.setServiceParent(p)
- self.assertRaises(RuntimeError, s.setName, "lala")
- def testDuplicateNamedChild(self):
- s = service.Service()
- p = service.MultiService()
- s.setName("hello")
- s.setServiceParent(p)
- s = service.Service()
- s.setName("hello")
- self.assertRaises(RuntimeError, s.setServiceParent, p)
- def testDisowning(self):
- s = service.Service()
- p = service.MultiService()
- s.setServiceParent(p)
- self.assertEqual(list(p), [s])
- self.assertEqual(s.parent, p)
- s.disownServiceParent()
- self.assertEqual(list(p), [])
- self.assertIsNone(s.parent)
- def testRunning(self):
- s = service.Service()
- self.assertFalse(s.running)
- s.startService()
- self.assertTrue(s.running)
- s.stopService()
- self.assertFalse(s.running)
- def testRunningChildren1(self):
- s = service.Service()
- p = service.MultiService()
- s.setServiceParent(p)
- self.assertFalse(s.running)
- self.assertFalse(p.running)
- p.startService()
- self.assertTrue(s.running)
- self.assertTrue(p.running)
- p.stopService()
- self.assertFalse(s.running)
- self.assertFalse(p.running)
- def testRunningChildren2(self):
- s = service.Service()
- def checkRunning():
- self.assertTrue(s.running)
- t = service.Service()
- t.stopService = checkRunning
- t.startService = checkRunning
- p = service.MultiService()
- s.setServiceParent(p)
- t.setServiceParent(p)
- p.startService()
- p.stopService()
- def testAddingIntoRunning(self):
- p = service.MultiService()
- p.startService()
- s = service.Service()
- self.assertFalse(s.running)
- s.setServiceParent(p)
- self.assertTrue(s.running)
- s.disownServiceParent()
- self.assertFalse(s.running)
- def testPrivileged(self):
- s = service.Service()
- def pss():
- s.privilegedStarted = 1
- s.privilegedStartService = pss
- s1 = service.Service()
- p = service.MultiService()
- s.setServiceParent(p)
- s1.setServiceParent(p)
- p.privilegedStartService()
- self.assertTrue(s.privilegedStarted)
- def testCopying(self):
- s = service.Service()
- s.startService()
- s1 = copy.copy(s)
- self.assertFalse(s1.running)
- self.assertTrue(s.running)
- if hasattr(os, "getuid"):
- curuid = os.getuid()
- curgid = os.getgid()
- else:
- curuid = curgid = 0
- class ProcessTests(unittest.TestCase):
- def testID(self):
- p = service.Process(5, 6)
- self.assertEqual(p.uid, 5)
- self.assertEqual(p.gid, 6)
- def testDefaults(self):
- p = service.Process(5)
- self.assertEqual(p.uid, 5)
- self.assertIsNone(p.gid)
- p = service.Process(gid=5)
- self.assertIsNone(p.uid)
- self.assertEqual(p.gid, 5)
- p = service.Process()
- self.assertIsNone(p.uid)
- self.assertIsNone(p.gid)
- def testProcessName(self):
- p = service.Process()
- self.assertIsNone(p.processName)
- p.processName = 'hello'
- self.assertEqual(p.processName, 'hello')
- class InterfacesTests(unittest.TestCase):
- def testService(self):
- self.assertTrue(service.IService.providedBy(service.Service()))
- def testMultiService(self):
- self.assertTrue(service.IService.providedBy(service.MultiService()))
- self.assertTrue(service.IServiceCollection.providedBy(
- service.MultiService()))
- def testProcess(self):
- self.assertTrue(service.IProcess.providedBy(service.Process()))
- class ApplicationTests(unittest.TestCase):
- def testConstructor(self):
- service.Application("hello")
- service.Application("hello", 5)
- service.Application("hello", 5, 6)
- def testProcessComponent(self):
- a = service.Application("hello")
- self.assertIsNone(service.IProcess(a).uid)
- self.assertIsNone(service.IProcess(a).gid)
- a = service.Application("hello", 5)
- self.assertEqual(service.IProcess(a).uid, 5)
- self.assertIsNone(service.IProcess(a).gid)
- a = service.Application("hello", 5, 6)
- self.assertEqual(service.IProcess(a).uid, 5)
- self.assertEqual(service.IProcess(a).gid, 6)
- def testServiceComponent(self):
- a = service.Application("hello")
- self.assertIs(service.IService(a), service.IServiceCollection(a))
- self.assertEqual(service.IService(a).name, "hello")
- self.assertIsNone(service.IService(a).parent)
- def testPersistableComponent(self):
- a = service.Application("hello")
- p = sob.IPersistable(a)
- self.assertEqual(p.style, 'pickle')
- self.assertEqual(p.name, 'hello')
- self.assertIs(p.original, a)
- class LoadingTests(unittest.TestCase):
- def test_simpleStoreAndLoad(self):
- a = service.Application("hello")
- p = sob.IPersistable(a)
- for style in 'source pickle'.split():
- p.setStyle(style)
- p.save()
- a1 = service.loadApplication("hello.ta"+style[0], style)
- self.assertEqual(service.IService(a1).name, "hello")
- with open("hello.tac", 'w') as f:
- f.writelines([
- "from twisted.application import service\n",
- "application = service.Application('hello')\n",
- ])
- a1 = service.loadApplication("hello.tac", 'python')
- self.assertEqual(service.IService(a1).name, "hello")
- class AppSupportTests(unittest.TestCase):
- def testPassphrase(self):
- self.assertIsNone(app.getPassphrase(0))
- def testLoadApplication(self):
- """
- Test loading an application file in different dump format.
- """
- a = service.Application("hello")
- baseconfig = {'file': None, 'source': None, 'python':None}
- for style in 'source pickle'.split():
- config = baseconfig.copy()
- config[{'pickle': 'file'}.get(style, style)] = 'helloapplication'
- sob.IPersistable(a).setStyle(style)
- sob.IPersistable(a).save(filename='helloapplication')
- a1 = app.getApplication(config, None)
- self.assertEqual(service.IService(a1).name, "hello")
- config = baseconfig.copy()
- config['python'] = 'helloapplication'
- with open("helloapplication", 'w') as f:
- f.writelines([
- "from twisted.application import service\n",
- "application = service.Application('hello')\n",
- ])
- a1 = app.getApplication(config, None)
- self.assertEqual(service.IService(a1).name, "hello")
- def test_convertStyle(self):
- appl = service.Application("lala")
- for instyle in 'source pickle'.split():
- for outstyle in 'source pickle'.split():
- sob.IPersistable(appl).setStyle(instyle)
- sob.IPersistable(appl).save(filename="converttest")
- app.convertStyle("converttest", instyle, None,
- "converttest.out", outstyle, 0)
- appl2 = service.loadApplication("converttest.out", outstyle)
- self.assertEqual(service.IService(appl2).name, "lala")
- def test_startApplication(self):
- appl = service.Application("lala")
- app.startApplication(appl, 0)
- self.assertTrue(service.IService(appl).running)
- class Foo(basic.LineReceiver):
- def connectionMade(self):
- self.transport.write(b'lalala\r\n')
- def lineReceived(self, line):
- self.factory.line = line
- self.transport.loseConnection()
- def connectionLost(self, reason):
- self.factory.d.callback(self.factory.line)
- class DummyApp:
- processName = None
- def addService(self, service):
- self.services[service.name] = service
- def removeService(self, service):
- del self.services[service.name]
- class TimerTarget:
- def __init__(self):
- self.l = []
- def append(self, what):
- self.l.append(what)
- class TestEcho(wire.Echo):
- def connectionLost(self, reason):
- self.d.callback(True)
- class InternetTests(unittest.TestCase):
- def testTCP(self):
- s = service.MultiService()
- s.startService()
- factory = protocol.ServerFactory()
- factory.protocol = TestEcho
- TestEcho.d = defer.Deferred()
- t = internet.TCPServer(0, factory)
- t.setServiceParent(s)
- num = t._port.getHost().port
- factory = protocol.ClientFactory()
- factory.d = defer.Deferred()
- factory.protocol = Foo
- factory.line = None
- internet.TCPClient('127.0.0.1', num, factory).setServiceParent(s)
- factory.d.addCallback(self.assertEqual, b'lalala')
- factory.d.addCallback(lambda x : s.stopService())
- factory.d.addCallback(lambda x : TestEcho.d)
- return factory.d
- def test_UDP(self):
- """
- Test L{internet.UDPServer} with a random port: starting the service
- should give it valid port, and stopService should free it so that we
- can start a server on the same port again.
- """
- if not interfaces.IReactorUDP(reactor, None):
- raise unittest.SkipTest("This reactor does not support UDP sockets")
- p = protocol.DatagramProtocol()
- t = internet.UDPServer(0, p)
- t.startService()
- num = t._port.getHost().port
- self.assertNotEqual(num, 0)
- def onStop(ignored):
- t = internet.UDPServer(num, p)
- t.startService()
- return t.stopService()
- return defer.maybeDeferred(t.stopService).addCallback(onStop)
- def testPrivileged(self):
- factory = protocol.ServerFactory()
- factory.protocol = TestEcho
- TestEcho.d = defer.Deferred()
- t = internet.TCPServer(0, factory)
- t.privileged = 1
- t.privilegedStartService()
- num = t._port.getHost().port
- factory = protocol.ClientFactory()
- factory.d = defer.Deferred()
- factory.protocol = Foo
- factory.line = None
- c = internet.TCPClient('127.0.0.1', num, factory)
- c.startService()
- factory.d.addCallback(self.assertEqual, b'lalala')
- factory.d.addCallback(lambda x : c.stopService())
- factory.d.addCallback(lambda x : t.stopService())
- factory.d.addCallback(lambda x : TestEcho.d)
- return factory.d
- def testConnectionGettingRefused(self):
- factory = protocol.ServerFactory()
- factory.protocol = wire.Echo
- t = internet.TCPServer(0, factory)
- t.startService()
- num = t._port.getHost().port
- t.stopService()
- d = defer.Deferred()
- factory = protocol.ClientFactory()
- factory.clientConnectionFailed = lambda *args: d.callback(None)
- c = internet.TCPClient('127.0.0.1', num, factory)
- c.startService()
- return d
- def testUNIX(self):
- # FIXME: This test is far too dense. It needs comments.
- # -- spiv, 2004-11-07
- s = service.MultiService()
- s.startService()
- factory = protocol.ServerFactory()
- factory.protocol = TestEcho
- TestEcho.d = defer.Deferred()
- t = internet.UNIXServer('echo.skt', factory)
- t.setServiceParent(s)
- factory = protocol.ClientFactory()
- factory.protocol = Foo
- factory.d = defer.Deferred()
- factory.line = None
- internet.UNIXClient('echo.skt', factory).setServiceParent(s)
- factory.d.addCallback(self.assertEqual, b'lalala')
- factory.d.addCallback(lambda x : s.stopService())
- factory.d.addCallback(lambda x : TestEcho.d)
- factory.d.addCallback(self._cbTestUnix, factory, s)
- return factory.d
- def _cbTestUnix(self, ignored, factory, s):
- TestEcho.d = defer.Deferred()
- factory.line = None
- factory.d = defer.Deferred()
- s.startService()
- factory.d.addCallback(self.assertEqual, b'lalala')
- factory.d.addCallback(lambda x : s.stopService())
- factory.d.addCallback(lambda x : TestEcho.d)
- return factory.d
- def testVolatile(self):
- factory = protocol.ServerFactory()
- factory.protocol = wire.Echo
- t = internet.UNIXServer('echo.skt', factory)
- t.startService()
- self.failIfIdentical(t._port, None)
- t1 = copy.copy(t)
- self.assertIsNone(t1._port)
- t.stopService()
- self.assertIsNone(t._port)
- self.assertFalse(t.running)
- factory = protocol.ClientFactory()
- factory.protocol = wire.Echo
- t = internet.UNIXClient('echo.skt', factory)
- t.startService()
- self.failIfIdentical(t._connection, None)
- t1 = copy.copy(t)
- self.assertIsNone(t1._connection)
- t.stopService()
- self.assertIsNone(t._connection)
- self.assertFalse(t.running)
- def testStoppingServer(self):
- factory = protocol.ServerFactory()
- factory.protocol = wire.Echo
- t = internet.UNIXServer('echo.skt', factory)
- t.startService()
- t.stopService()
- self.assertFalse(t.running)
- factory = protocol.ClientFactory()
- d = defer.Deferred()
- factory.clientConnectionFailed = lambda *args: d.callback(None)
- reactor.connectUNIX('echo.skt', factory)
- return d
- if not interfaces.IReactorUNIX(reactor, None):
- _skipMsg = "This reactor does not support UNIX domain sockets"
- testUNIX.skip = _skipMsg
- testVolatile.skip = _skipMsg
- testStoppingServer.skip = _skipMsg
- def testPickledTimer(self):
- target = TimerTarget()
- t0 = internet.TimerService(1, target.append, "hello")
- t0.startService()
- s = pickle.dumps(t0)
- t0.stopService()
- t = pickle.loads(s)
- self.assertFalse(t.running)
- def testBrokenTimer(self):
- d = defer.Deferred()
- t = internet.TimerService(1, lambda: 1 // 0)
- oldFailed = t._failed
- def _failed(why):
- oldFailed(why)
- d.callback(None)
- t._failed = _failed
- t.startService()
- d.addCallback(lambda x : t.stopService)
- d.addCallback(lambda x : self.assertEqual(
- [ZeroDivisionError],
- [o.value.__class__ for o in self.flushLoggedErrors(ZeroDivisionError)]))
- return d
- def test_everythingThere(self):
- """
- L{twisted.application.internet} dynamically defines a set of
- L{service.Service} subclasses that in general have corresponding
- reactor.listenXXX or reactor.connectXXX calls.
- """
- trans = 'TCP UNIX SSL UDP UNIXDatagram Multicast'.split()
- for tran in trans[:]:
- if not getattr(interfaces, "IReactor" + tran)(reactor, None):
- trans.remove(tran)
- for tran in trans:
- for side in 'Server Client'.split():
- if tran == "Multicast" and side == "Client":
- continue
- if tran == "UDP" and side == "Client":
- continue
- self.assertTrue(hasattr(internet, tran + side))
- method = getattr(internet, tran + side).method
- prefix = {'Server': 'listen', 'Client': 'connect'}[side]
- self.assertTrue(hasattr(reactor, prefix + method) or
- (prefix == "connect" and method == "UDP"))
- o = getattr(internet, tran + side)()
- self.assertEqual(service.IService(o), o)
- def test_importAll(self):
- """
- L{twisted.application.internet} dynamically defines L{service.Service}
- subclasses. This test ensures that the subclasses exposed by C{__all__}
- are valid attributes of the module.
- """
- for cls in internet.__all__:
- self.assertTrue(
- hasattr(internet, cls),
- '%s not importable from twisted.application.internet' % (cls,))
- def test_reactorParametrizationInServer(self):
- """
- L{internet._AbstractServer} supports a C{reactor} keyword argument
- that can be used to parametrize the reactor used to listen for
- connections.
- """
- reactor = MemoryReactor()
- factory = object()
- t = internet.TCPServer(1234, factory, reactor=reactor)
- t.startService()
- self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
- def test_reactorParametrizationInClient(self):
- """
- L{internet._AbstractClient} supports a C{reactor} keyword arguments
- that can be used to parametrize the reactor used to create new client
- connections.
- """
- reactor = MemoryReactor()
- factory = protocol.ClientFactory()
- t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
- t.startService()
- self.assertEqual(
- reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
- def test_reactorParametrizationInServerMultipleStart(self):
- """
- Like L{test_reactorParametrizationInServer}, but stop and restart the
- service and check that the given reactor is still used.
- """
- reactor = MemoryReactor()
- factory = protocol.Factory()
- t = internet.TCPServer(1234, factory, reactor=reactor)
- t.startService()
- self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
- t.stopService()
- t.startService()
- self.assertEqual(reactor.tcpServers.pop()[:2], (1234, factory))
- def test_reactorParametrizationInClientMultipleStart(self):
- """
- Like L{test_reactorParametrizationInClient}, but stop and restart the
- service and check that the given reactor is still used.
- """
- reactor = MemoryReactor()
- factory = protocol.ClientFactory()
- t = internet.TCPClient('127.0.0.1', 1234, factory, reactor=reactor)
- t.startService()
- self.assertEqual(
- reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
- t.stopService()
- t.startService()
- self.assertEqual(
- reactor.tcpClients.pop()[:3], ('127.0.0.1', 1234, factory))
- class TimerBasicTests(unittest.TestCase):
- def testTimerRuns(self):
- d = defer.Deferred()
- self.t = internet.TimerService(1, d.callback, 'hello')
- self.t.startService()
- d.addCallback(self.assertEqual, 'hello')
- d.addCallback(lambda x : self.t.stopService())
- d.addCallback(lambda x : self.assertFalse(self.t.running))
- return d
- def tearDown(self):
- return self.t.stopService()
- def testTimerRestart(self):
- # restart the same TimerService
- d1 = defer.Deferred()
- d2 = defer.Deferred()
- work = [(d2, "bar"), (d1, "foo")]
- def trigger():
- d, arg = work.pop()
- d.callback(arg)
- self.t = internet.TimerService(1, trigger)
- self.t.startService()
- def onFirstResult(result):
- self.assertEqual(result, 'foo')
- return self.t.stopService()
- def onFirstStop(ignored):
- self.assertFalse(self.t.running)
- self.t.startService()
- return d2
- def onSecondResult(result):
- self.assertEqual(result, 'bar')
- self.t.stopService()
- d1.addCallback(onFirstResult)
- d1.addCallback(onFirstStop)
- d1.addCallback(onSecondResult)
- return d1
- def testTimerLoops(self):
- l = []
- def trigger(data, number, d):
- l.append(data)
- if len(l) == number:
- d.callback(l)
- d = defer.Deferred()
- self.t = internet.TimerService(0.01, trigger, "hello", 10, d)
- self.t.startService()
- d.addCallback(self.assertEqual, ['hello'] * 10)
- d.addCallback(lambda x : self.t.stopService())
- return d
- class FakeReactor(reactors.Reactor):
- """
- A fake reactor with a hooked install method.
- """
- def __init__(self, install, *args, **kwargs):
- """
- @param install: any callable that will be used as install method.
- @type install: C{callable}
- """
- reactors.Reactor.__init__(self, *args, **kwargs)
- self.install = install
- class PluggableReactorTests(TwistedModulesMixin, unittest.TestCase):
- """
- Tests for the reactor discovery/inspection APIs.
- """
- def setUp(self):
- """
- Override the L{reactors.getPlugins} function, normally bound to
- L{twisted.plugin.getPlugins}, in order to control which
- L{IReactorInstaller} plugins are seen as available.
- C{self.pluginResults} can be customized and will be used as the
- result of calls to C{reactors.getPlugins}.
- """
- self.pluginCalls = []
- self.pluginResults = []
- self.originalFunction = reactors.getPlugins
- reactors.getPlugins = self._getPlugins
- def tearDown(self):
- """
- Restore the original L{reactors.getPlugins}.
- """
- reactors.getPlugins = self.originalFunction
- def _getPlugins(self, interface, package=None):
- """
- Stand-in for the real getPlugins method which records its arguments
- and returns a fixed result.
- """
- self.pluginCalls.append((interface, package))
- return list(self.pluginResults)
- def test_getPluginReactorTypes(self):
- """
- Test that reactor plugins are returned from L{getReactorTypes}
- """
- name = 'fakereactortest'
- package = __name__ + '.fakereactor'
- description = 'description'
- self.pluginResults = [reactors.Reactor(name, package, description)]
- reactorTypes = reactors.getReactorTypes()
- self.assertEqual(
- self.pluginCalls,
- [(reactors.IReactorInstaller, None)])
- for r in reactorTypes:
- if r.shortName == name:
- self.assertEqual(r.description, description)
- break
- else:
- self.fail("Reactor plugin not present in getReactorTypes() result")
- def test_reactorInstallation(self):
- """
- Test that L{reactors.Reactor.install} loads the correct module and
- calls its install attribute.
- """
- installed = []
- def install():
- installed.append(True)
- fakeReactor = FakeReactor(install,
- 'fakereactortest', __name__, 'described')
- modules = {'fakereactortest': fakeReactor}
- self.replaceSysModules(modules)
- installer = reactors.Reactor('fakereactor', 'fakereactortest', 'described')
- installer.install()
- self.assertEqual(installed, [True])
- def test_installReactor(self):
- """
- Test that the L{reactors.installReactor} function correctly installs
- the specified reactor.
- """
- installed = []
- def install():
- installed.append(True)
- name = 'fakereactortest'
- package = __name__
- description = 'description'
- self.pluginResults = [FakeReactor(install, name, package, description)]
- reactors.installReactor(name)
- self.assertEqual(installed, [True])
- def test_installReactorReturnsReactor(self):
- """
- Test that the L{reactors.installReactor} function correctly returns
- the installed reactor.
- """
- reactor = object()
- def install():
- from twisted import internet
- self.patch(internet, 'reactor', reactor)
- name = 'fakereactortest'
- package = __name__
- description = 'description'
- self.pluginResults = [FakeReactor(install, name, package, description)]
- installed = reactors.installReactor(name)
- self.assertIs(installed, reactor)
- def test_installReactorMultiplePlugins(self):
- """
- Test that the L{reactors.installReactor} function correctly installs
- the specified reactor when there are multiple reactor plugins.
- """
- installed = []
- def install():
- installed.append(True)
- name = 'fakereactortest'
- package = __name__
- description = 'description'
- fakeReactor = FakeReactor(install, name, package, description)
- otherReactor = FakeReactor(lambda: None,
- "otherreactor", package, description)
- self.pluginResults = [otherReactor, fakeReactor]
- reactors.installReactor(name)
- self.assertEqual(installed, [True])
- def test_installNonExistentReactor(self):
- """
- Test that L{reactors.installReactor} raises L{reactors.NoSuchReactor}
- when asked to install a reactor which it cannot find.
- """
- self.pluginResults = []
- self.assertRaises(
- reactors.NoSuchReactor,
- reactors.installReactor, 'somereactor')
- def test_installNotAvailableReactor(self):
- """
- Test that L{reactors.installReactor} raises an exception when asked to
- install a reactor which doesn't work in this environment.
- """
- def install():
- raise ImportError("Missing foo bar")
- name = 'fakereactortest'
- package = __name__
- description = 'description'
- self.pluginResults = [FakeReactor(install, name, package, description)]
- self.assertRaises(ImportError, reactors.installReactor, name)
- def test_reactorSelectionMixin(self):
- """
- Test that the reactor selected is installed as soon as possible, ie
- when the option is parsed.
- """
- executed = []
- INSTALL_EVENT = 'reactor installed'
- SUBCOMMAND_EVENT = 'subcommands loaded'
- class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
- def subCommands(self):
- executed.append(SUBCOMMAND_EVENT)
- return [('subcommand', None, lambda: self, 'test subcommand')]
- subCommands = property(subCommands)
- def install():
- executed.append(INSTALL_EVENT)
- self.pluginResults = [
- FakeReactor(install, 'fakereactortest', __name__, 'described')
- ]
- options = ReactorSelectionOptions()
- options.parseOptions(['--reactor', 'fakereactortest', 'subcommand'])
- self.assertEqual(executed[0], INSTALL_EVENT)
- self.assertEqual(executed.count(INSTALL_EVENT), 1)
- self.assertEqual(options["reactor"], "fakereactortest")
- def test_reactorSelectionMixinNonExistent(self):
- """
- Test that the usage mixin exits when trying to use a non existent
- reactor (the name not matching to any reactor), giving an error
- message.
- """
- class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
- pass
- self.pluginResults = []
- options = ReactorSelectionOptions()
- options.messageOutput = NativeStringIO()
- e = self.assertRaises(usage.UsageError, options.parseOptions,
- ['--reactor', 'fakereactortest', 'subcommand'])
- self.assertIn("fakereactortest", e.args[0])
- self.assertIn("help-reactors", e.args[0])
- def test_reactorSelectionMixinNotAvailable(self):
- """
- Test that the usage mixin exits when trying to use a reactor not
- available (the reactor raises an error at installation), giving an
- error message.
- """
- class ReactorSelectionOptions(usage.Options, app.ReactorSelectionMixin):
- pass
- message = "Missing foo bar"
- def install():
- raise ImportError(message)
- name = 'fakereactortest'
- package = __name__
- description = 'description'
- self.pluginResults = [FakeReactor(install, name, package, description)]
- options = ReactorSelectionOptions()
- options.messageOutput = NativeStringIO()
- e = self.assertRaises(usage.UsageError, options.parseOptions,
- ['--reactor', 'fakereactortest', 'subcommand'])
- self.assertIn(message, e.args[0])
- self.assertIn("help-reactors", e.args[0])
- class HelpReactorsTests(unittest.TestCase):
- """
- --help-reactors lists the available reactors
- """
- def setUp(self):
- """
- Get the text from --help-reactors
- """
- self.options = app.ReactorSelectionMixin()
- self.options.messageOutput = NativeStringIO()
- self.assertRaises(SystemExit, self.options.opt_help_reactors)
- self.message = self.options.messageOutput.getvalue()
- def test_lacksAsyncIO(self):
- """
- --help-reactors should NOT display the asyncio reactor on Python < 3.4
- """
- self.assertIn(twisted_reactors.asyncio.description, self.message)
- self.assertIn("!" + twisted_reactors.asyncio.shortName, self.message)
- if asyncio:
- test_lacksAsyncIO.skip = "Not applicable, asyncio is available"
- def test_hasAsyncIO(self):
- """
- --help-reactors should display the asyncio reactor on Python >= 3.4
- """
- self.assertIn(twisted_reactors.asyncio.description, self.message)
- self.assertNotIn(
- "!" + twisted_reactors.asyncio.shortName, self.message)
- if not asyncio:
- test_hasAsyncIO.skip = "asyncio library not available"
- def test_iocpWin32(self):
- """
- --help-reactors should display the iocp reactor on Windows
- """
- self.assertIn(twisted_reactors.iocp.description, self.message)
- self.assertNotIn("!" + twisted_reactors.iocp.shortName, self.message)
- if platformType != "win32":
- test_iocpWin32.skip = "Test only applicable on Windows"
- def test_iocpNotWin32(self):
- """
- --help-reactors should NOT display the iocp reactor on Windows
- """
- self.assertIn(twisted_reactors.iocp.description, self.message)
- self.assertIn("!" + twisted_reactors.iocp.shortName, self.message)
- if platformType == "win32":
- test_iocpNotWin32.skip = "Test only applicable on Windows"
- def test_onlySupportedReactors(self):
- """
- --help-reactors with only supported reactors
- """
- def getReactorTypes():
- yield twisted_reactors.default
- options = app.ReactorSelectionMixin()
- options._getReactorTypes = getReactorTypes
- options.messageOutput = NativeStringIO()
- self.assertRaises(SystemExit, options.opt_help_reactors)
- message = options.messageOutput.getvalue()
- self.assertNotIn("reactors not available", message)
|