1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033 |
- # -*- test-case-name: twisted.application.test.test_internet -*-
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for (new code in) L{twisted.application.internet}.
- @var AT_LEAST_ONE_ATTEMPT: At least enough seconds for L{ClientService} to make
- one attempt.
- """
- from __future__ import absolute_import, division
- import pickle
- from zope.interface import implementer
- from zope.interface.verify import verifyClass
- from twisted.internet.protocol import Factory, Protocol
- from twisted.internet.task import Clock
- from twisted.trial.unittest import TestCase, SynchronousTestCase
- from twisted.application import internet
- from twisted.application.internet import (
- StreamServerEndpointService, TimerService, ClientService)
- from twisted.internet.defer import Deferred, CancelledError
- from twisted.internet.interfaces import (
- IStreamServerEndpoint, IStreamClientEndpoint, IListeningPort,
- IHalfCloseableProtocol, IFileDescriptorReceiver
- )
- from twisted.internet import task
- from twisted.python.failure import Failure
- from twisted.logger import globalLogPublisher, formatEvent
- from twisted.test.proto_helpers import StringTransport
- def fakeTargetFunction():
- """
- A fake target function for testing TimerService which does nothing.
- """
- pass
- @implementer(IStreamServerEndpoint)
- class FakeServer(object):
- """
- In-memory implementation of L{IStreamServerEndpoint}.
- @ivar result: The L{Deferred} resulting from the call to C{listen}, after
- C{listen} has been called.
- @ivar factory: The factory passed to C{listen}.
- @ivar cancelException: The exception to errback C{self.result} when it is
- cancelled.
- @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire
- with.
- @ivar listenAttempts: The number of times C{listen} has been invoked.
- @ivar failImmediately: If set, the exception to fail the L{Deferred}
- returned from C{listen} before it is returned.
- """
- result = None
- factory = None
- failImmediately = None
- cancelException = CancelledError()
- listenAttempts = 0
- def __init__(self):
- self.port = FakePort()
- def listen(self, factory):
- """
- Return a Deferred and store it for future use. (Implementation of
- L{IStreamServerEndpoint}).
- @param factory: the factory to listen with
- @return: a L{Deferred} stored in L{FakeServer.result}
- """
- self.listenAttempts += 1
- self.factory = factory
- self.result = Deferred(
- canceller=lambda d: d.errback(self.cancelException))
- if self.failImmediately is not None:
- self.result.errback(self.failImmediately)
- return self.result
- def startedListening(self):
- """
- Test code should invoke this method after causing C{listen} to be
- invoked in order to fire the L{Deferred} previously returned from
- C{listen}.
- """
- self.result.callback(self.port)
- def stoppedListening(self):
- """
- Test code should invoke this method after causing C{stopListening} to
- be invoked on the port fired from the L{Deferred} returned from
- C{listen} in order to cause the L{Deferred} returned from
- C{stopListening} to fire.
- """
- self.port.deferred.callback(None)
- verifyClass(IStreamServerEndpoint, FakeServer)
- @implementer(IListeningPort)
- class FakePort(object):
- """
- Fake L{IListeningPort} implementation.
- @ivar deferred: The L{Deferred} returned by C{stopListening}.
- """
- deferred = None
- def stopListening(self):
- """
- Stop listening.
- @return: a L{Deferred} stored in L{FakePort.deferred}
- """
- self.deferred = Deferred()
- return self.deferred
- verifyClass(IStreamServerEndpoint, FakeServer)
- class EndpointServiceTests(TestCase):
- """
- Tests for L{twisted.application.internet}.
- """
- def setUp(self):
- """
- Construct a stub server, a stub factory, and a
- L{StreamServerEndpointService} to test.
- """
- self.fakeServer = FakeServer()
- self.factory = Factory()
- self.svc = StreamServerEndpointService(self.fakeServer, self.factory)
- def test_privilegedStartService(self):
- """
- L{StreamServerEndpointService.privilegedStartService} calls its
- endpoint's C{listen} method with its factory.
- """
- self.svc.privilegedStartService()
- self.assertIdentical(self.factory, self.fakeServer.factory)
- def test_synchronousRaiseRaisesSynchronously(self, thunk=None):
- """
- L{StreamServerEndpointService.startService} should raise synchronously
- if the L{Deferred} returned by its wrapped
- L{IStreamServerEndpoint.listen} has already fired with an errback and
- the L{StreamServerEndpointService}'s C{_raiseSynchronously} flag has
- been set. This feature is necessary to preserve compatibility with old
- behavior of L{twisted.internet.strports.service}, which is to return a
- service which synchronously raises an exception from C{startService}
- (so that, among other things, twistd will not start running). However,
- since L{IStreamServerEndpoint.listen} may fail asynchronously, it is a
- bad idea to rely on this behavior.
- @param thunk: If specified, a callable to execute in place of
- C{startService}.
- """
- self.fakeServer.failImmediately = ZeroDivisionError()
- self.svc._raiseSynchronously = True
- self.assertRaises(ZeroDivisionError, thunk or self.svc.startService)
- def test_synchronousRaisePrivileged(self):
- """
- L{StreamServerEndpointService.privilegedStartService} should behave the
- same as C{startService} with respect to
- L{EndpointServiceTests.test_synchronousRaiseRaisesSynchronously}.
- """
- self.test_synchronousRaiseRaisesSynchronously(
- self.svc.privilegedStartService)
- def test_failReportsError(self):
- """
- L{StreamServerEndpointService.startService} and
- L{StreamServerEndpointService.privilegedStartService} should both log
- an exception when the L{Deferred} returned from their wrapped
- L{IStreamServerEndpoint.listen} fails.
- """
- self.svc.startService()
- self.fakeServer.result.errback(ZeroDivisionError())
- logged = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(logged), 1)
- def test_asynchronousFailReportsError(self):
- """
- L{StreamServerEndpointService.startService} and
- L{StreamServerEndpointService.privilegedStartService} should both log
- an exception when the L{Deferred} returned from their wrapped
- L{IStreamServerEndpoint.listen} fails asynchronously, even if
- C{_raiseSynchronously} is set.
- """
- self.svc._raiseSynchronously = True
- self.svc.startService()
- self.fakeServer.result.errback(ZeroDivisionError())
- logged = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(logged), 1)
- def test_synchronousFailReportsError(self):
- """
- Without the C{_raiseSynchronously} compatibility flag, failing
- immediately has the same behavior as failing later; it logs the error.
- """
- self.fakeServer.failImmediately = ZeroDivisionError()
- self.svc.startService()
- logged = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(logged), 1)
- def test_startServiceUnstarted(self):
- """
- L{StreamServerEndpointService.startService} sets the C{running} flag,
- and calls its endpoint's C{listen} method with its factory, if it
- has not yet been started.
- """
- self.svc.startService()
- self.assertIdentical(self.factory, self.fakeServer.factory)
- self.assertEqual(self.svc.running, True)
- def test_startServiceStarted(self):
- """
- L{StreamServerEndpointService.startService} sets the C{running} flag,
- but nothing else, if the service has already been started.
- """
- self.test_privilegedStartService()
- self.svc.startService()
- self.assertEqual(self.fakeServer.listenAttempts, 1)
- self.assertEqual(self.svc.running, True)
- def test_stopService(self):
- """
- L{StreamServerEndpointService.stopService} calls C{stopListening} on
- the L{IListeningPort} returned from its endpoint, returns the
- C{Deferred} from stopService, and sets C{running} to C{False}.
- """
- self.svc.privilegedStartService()
- self.fakeServer.startedListening()
- # Ensure running gets set to true
- self.svc.startService()
- result = self.svc.stopService()
- l = []
- result.addCallback(l.append)
- self.assertEqual(len(l), 0)
- self.fakeServer.stoppedListening()
- self.assertEqual(len(l), 1)
- self.assertFalse(self.svc.running)
- def test_stopServiceBeforeStartFinished(self):
- """
- L{StreamServerEndpointService.stopService} cancels the L{Deferred}
- returned by C{listen} if it has not yet fired. No error will be logged
- about the cancellation of the listen attempt.
- """
- self.svc.privilegedStartService()
- result = self.svc.stopService()
- l = []
- result.addBoth(l.append)
- self.assertEqual(l, [None])
- self.assertEqual(self.flushLoggedErrors(CancelledError), [])
- def test_stopServiceCancelStartError(self):
- """
- L{StreamServerEndpointService.stopService} cancels the L{Deferred}
- returned by C{listen} if it has not fired yet. An error will be logged
- if the resulting exception is not L{CancelledError}.
- """
- self.fakeServer.cancelException = ZeroDivisionError()
- self.svc.privilegedStartService()
- result = self.svc.stopService()
- l = []
- result.addCallback(l.append)
- self.assertEqual(l, [None])
- stoppingErrors = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(len(stoppingErrors), 1)
- class TimerServiceTests(TestCase):
- """
- Tests for L{twisted.application.internet.TimerService}.
- @type timer: L{TimerService}
- @ivar timer: service to test
- @type clock: L{task.Clock}
- @ivar clock: source of time
- @type deferred: L{Deferred}
- @ivar deferred: deferred returned by L{TimerServiceTests.call}.
- """
- def setUp(self):
- """
- Set up a timer service to test.
- """
- self.timer = TimerService(2, self.call)
- self.clock = self.timer.clock = task.Clock()
- self.deferred = Deferred()
- def call(self):
- """
- Function called by L{TimerService} being tested.
- @returns: C{self.deferred}
- @rtype: L{Deferred}
- """
- return self.deferred
- def test_startService(self):
- """
- When L{TimerService.startService} is called, it marks itself
- as running, creates a L{task.LoopingCall} and starts it.
- """
- self.timer.startService()
- self.assertTrue(self.timer.running, "Service is started")
- self.assertIsInstance(self.timer._loop, task.LoopingCall)
- self.assertIdentical(self.clock, self.timer._loop.clock)
- self.assertTrue(self.timer._loop.running, "LoopingCall is started")
- def test_startServiceRunsCallImmediately(self):
- """
- When L{TimerService.startService} is called, it calls the function
- immediately.
- """
- result = []
- self.timer.call = (result.append, (None,), {})
- self.timer.startService()
- self.assertEqual([None], result)
- def test_startServiceUsesGlobalReactor(self):
- """
- L{TimerService.startService} uses L{internet._maybeGlobalReactor} to
- choose the reactor to pass to L{task.LoopingCall}
- uses the global reactor.
- """
- otherClock = task.Clock()
- def getOtherClock(maybeReactor):
- return otherClock
- self.patch(internet, "_maybeGlobalReactor", getOtherClock)
- self.timer.startService()
- self.assertIdentical(otherClock, self.timer._loop.clock)
- def test_stopServiceWaits(self):
- """
- When L{TimerService.stopService} is called while a call is in progress.
- the L{Deferred} returned doesn't fire until after the call finishes.
- """
- self.timer.startService()
- d = self.timer.stopService()
- self.assertNoResult(d)
- self.assertEqual(True, self.timer.running)
- self.deferred.callback(object())
- self.assertIdentical(self.successResultOf(d), None)
- def test_stopServiceImmediately(self):
- """
- When L{TimerService.stopService} is called while a call isn't in progress.
- the L{Deferred} returned has already been fired.
- """
- self.timer.startService()
- self.deferred.callback(object())
- d = self.timer.stopService()
- self.assertIdentical(self.successResultOf(d), None)
- def test_failedCallLogsError(self):
- """
- When function passed to L{TimerService} returns a deferred that errbacks,
- the exception is logged, and L{TimerService.stopService} doesn't raise an error.
- """
- self.timer.startService()
- self.deferred.errback(Failure(ZeroDivisionError()))
- errors = self.flushLoggedErrors(ZeroDivisionError)
- self.assertEqual(1, len(errors))
- d = self.timer.stopService()
- self.assertIdentical(self.successResultOf(d), None)
- def test_pickleTimerServiceNotPickleLoop(self):
- """
- When pickling L{internet.TimerService}, it won't pickle
- L{internet.TimerService._loop}.
- """
- # We need a pickleable callable to test pickling TimerService. So we
- # can't use self.timer
- timer = TimerService(1, fakeTargetFunction)
- timer.startService()
- dumpedTimer = pickle.dumps(timer)
- timer.stopService()
- loadedTimer = pickle.loads(dumpedTimer)
- nothing = object()
- value = getattr(loadedTimer, "_loop", nothing)
- self.assertIdentical(nothing, value)
- def test_pickleTimerServiceNotPickleLoopFinished(self):
- """
- When pickling L{internet.TimerService}, it won't pickle
- L{internet.TimerService._loopFinished}.
- """
- # We need a pickleable callable to test pickling TimerService. So we
- # can't use self.timer
- timer = TimerService(1, fakeTargetFunction)
- timer.startService()
- dumpedTimer = pickle.dumps(timer)
- timer.stopService()
- loadedTimer = pickle.loads(dumpedTimer)
- nothing = object()
- value = getattr(loadedTimer, "_loopFinished", nothing)
- self.assertIdentical(nothing, value)
- class ConnectInformation(object):
- """
- Information about C{endpointForTesting}
- @ivar connectQueue: a L{list} of L{Deferred} returned from C{connect}. If
- these are not already fired, you can fire them with no value and they
- will trigger building a factory.
- @ivar constructedProtocols: a L{list} of protocols constructed.
- @ivar passedFactories: a L{list} of L{IProtocolFactory}; the ones actually
- passed to the underlying endpoint / i.e. the reactor.
- """
- def __init__(self):
- self.connectQueue = []
- self.constructedProtocols = []
- self.passedFactories = []
- def endpointForTesting(fireImmediately=False):
- """
- Make a sample endpoint for testing.
- @param fireImmediately: If true, fire all L{Deferred}s returned from
- C{connect} immedaitely.
- @return: a 2-tuple of C{(information, endpoint)}, where C{information} is a
- L{ConnectInformation} describing the operations in progress on
- C{endpoint}.
- """
- @implementer(IStreamClientEndpoint)
- class ClientTestEndpoint(object):
- def connect(self, factory):
- result = Deferred()
- info.passedFactories.append(factory)
- @result.addCallback
- def createProtocol(ignored):
- protocol = factory.buildProtocol(None)
- info.constructedProtocols.append(protocol)
- transport = StringTransport()
- protocol.makeConnection(transport)
- return protocol
- info.connectQueue.append(result)
- if fireImmediately:
- result.callback(None)
- return result
- info = ConnectInformation()
- return info, ClientTestEndpoint()
- def catchLogs(testCase, logPublisher=globalLogPublisher):
- """
- Catch the global log stream.
- @param testCase: The test case to add a cleanup to.
- @param logPublisher: the log publisher to add and remove observers for.
- @return: a 0-argument callable that returns a list of textual log messages
- for comparison.
- @rtype: L{list} of L{unicode}
- """
- logs = []
- logPublisher.addObserver(logs.append)
- testCase.addCleanup(lambda: logPublisher.removeObserver(logs.append))
- return lambda: [formatEvent(event) for event in logs]
- AT_LEAST_ONE_ATTEMPT = 100.
- class ClientServiceTests(SynchronousTestCase):
- """
- Tests for L{ClientService}.
- """
- def makeReconnector(self, fireImmediately=True, startService=True,
- protocolType=Protocol, **kw):
- """
- Create a L{ClientService} along with a L{ConnectInformation} indicating
- the connections in progress on its endpoint.
- @param fireImmediately: Should all of the endpoint connection attempts
- fire synchronously?
- @type fireImmediately: L{bool}
- @param startService: Should the L{ClientService} be started before
- being returned?
- @type startService: L{bool}
- @param protocolType: a 0-argument callable returning a new L{IProtocol}
- provider to be used for application-level protocol connections.
- @param kw: Arbitrary keyword arguments to be passed on to
- L{ClientService}
- @return: a 2-tuple of L{ConnectInformation} (for information about test
- state) and L{ClientService} (the system under test). The
- L{ConnectInformation} has 2 additional attributes;
- C{applicationFactory} and C{applicationProtocols}, which refer to
- the unwrapped protocol factory and protocol instances passed in to
- L{ClientService} respectively.
- """
- nkw = {}
- nkw.update(clock=Clock())
- nkw.update(kw)
- clock = nkw['clock']
- cq, endpoint = endpointForTesting(fireImmediately=fireImmediately)
- # `endpointForTesting` is totally generic to any LLPI client that uses
- # endpoints, and maintains all its state internally; however,
- # applicationProtocols and applicationFactory are bonus attributes that
- # are only specifically interesitng to tests that use wrapper
- # protocols. For now, set them here, externally.
- applicationProtocols = cq.applicationProtocols = []
- class RememberingFactory(Factory, object):
- protocol = protocolType
- def buildProtocol(self, addr):
- result = super(RememberingFactory, self).buildProtocol(addr)
- applicationProtocols.append(result)
- return result
- cq.applicationFactory = factory = RememberingFactory()
- service = ClientService(endpoint, factory, **nkw)
- def stop():
- service._protocol = None
- if service.running:
- service.stopService()
- # Ensure that we don't leave any state in the reactor after
- # stopService.
- self.assertEqual(clock.getDelayedCalls(), [])
- self.addCleanup(stop)
- if startService:
- service.startService()
- return cq, service
- def test_startService(self):
- """
- When the service is started, a connection attempt is made.
- """
- cq, service = self.makeReconnector(fireImmediately=False)
- self.assertEqual(len(cq.connectQueue), 1)
- def test_startStopFactory(self):
- """
- Although somewhat obscure, L{IProtocolFactory} includes both C{doStart}
- and C{doStop} methods; ensure that when these methods are called on the
- factory that was passed to the reactor, the factory that was passed
- from the application receives them.
- """
- cq, service = self.makeReconnector()
- firstAppFactory = cq.applicationFactory
- self.assertEqual(firstAppFactory.numPorts, 0)
- firstPassedFactory = cq.passedFactories[0]
- firstPassedFactory.doStart()
- self.assertEqual(firstAppFactory.numPorts, 1)
- def test_stopServiceWhileConnected(self):
- """
- When the service is stopped, no further connect attempts are made. The
- returned L{Deferred} fires when all outstanding connections have been
- stopped.
- """
- cq, service = self.makeReconnector()
- d = service.stopService()
- self.assertNoResult(d)
- protocol = cq.constructedProtocols[0]
- self.assertEqual(protocol.transport.disconnecting, True)
- protocol.connectionLost(Failure(Exception()))
- self.successResultOf(d)
- def test_startServiceWaitsForDisconnect(self):
- """
- When L{ClientService} is restarted after having been connected, it
- waits to start connecting until after having disconnected.
- """
- cq, service = self.makeReconnector()
- d = service.stopService()
- self.assertNoResult(d)
- protocol = cq.constructedProtocols[0]
- self.assertEqual(protocol.transport.disconnecting, True)
- service.startService()
- self.assertNoResult(d)
- self.assertEqual(len(cq.constructedProtocols), 1)
- protocol.connectionLost(Failure(Exception()))
- self.assertEqual(len(cq.constructedProtocols), 2)
- def test_startServiceWhileStopping(self):
- """
- When L{ClientService} is stopping - that is,
- L{ClientService.stopService} has been called, but the L{Deferred} it
- returns has not fired yet - calling L{startService} will cause a new
- connection to be made, and new calls to L{whenConnected} to succeed.
- """
- cq, service = self.makeReconnector(fireImmediately=False)
- cq.connectQueue[0].callback(None)
- first = cq.constructedProtocols[0]
- stopped = service.stopService()
- self.assertNoResult(stopped)
- nextProtocol = service.whenConnected()
- self.assertNoResult(nextProtocol)
- service.startService()
- self.assertNoResult(nextProtocol)
- self.assertNoResult(stopped)
- self.assertEqual(first.transport.disconnecting, True)
- first.connectionLost(Failure(Exception()))
- self.successResultOf(stopped)
- cq.connectQueue[1].callback(None)
- self.assertEqual(len(cq.constructedProtocols), 2)
- self.assertIdentical(self.successResultOf(nextProtocol),
- cq.applicationProtocols[1])
- secondStopped = service.stopService()
- self.assertNoResult(secondStopped)
- def test_startServiceWhileStopped(self):
- """
- When L{ClientService} is stopped - that is,
- L{ClientService.stopService} has been called and the L{Deferred} it
- returns has fired - calling L{startService} will cause a new connection
- to be made, and new calls to L{whenConnected} to succeed.
- """
- cq, service = self.makeReconnector(fireImmediately=False)
- stopped = service.stopService()
- self.successResultOf(stopped)
- self.failureResultOf(service.whenConnected(), CancelledError)
- service.startService()
- cq.connectQueue[-1].callback(None)
- self.assertIdentical(cq.applicationProtocols[-1],
- self.successResultOf(service.whenConnected()))
- def test_interfacesForTransport(self):
- """
- If the protocol objects returned by the factory given to
- L{ClientService} provide special "marker" interfaces for their
- transport - L{IHalfCloseableProtocol} or L{IFileDescriptorReceiver} -
- those interfaces will be provided by the protocol objects passed on to
- the reactor.
- """
- @implementer(IHalfCloseableProtocol, IFileDescriptorReceiver)
- class FancyProtocol(Protocol, object):
- """
- Provider of various interfaces.
- """
- cq, service = self.makeReconnector(protocolType=FancyProtocol)
- reactorFacing = cq.constructedProtocols[0]
- self.assertTrue(IFileDescriptorReceiver.providedBy(reactorFacing))
- self.assertTrue(IHalfCloseableProtocol.providedBy(reactorFacing))
- def test_stopServiceWhileRetrying(self):
- """
- When the service is stopped while retrying, the retry is cancelled.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- cq.connectQueue[0].errback(Exception())
- clock.advance(AT_LEAST_ONE_ATTEMPT)
- self.assertEqual(len(cq.connectQueue), 2)
- d = service.stopService()
- cq.connectQueue[1].errback(Exception())
- self.successResultOf(d)
- def test_stopServiceWhileConnecting(self):
- """
- When the service is stopped while initially connecting, the connection
- attempt is cancelled.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- self.assertEqual(len(cq.connectQueue), 1)
- self.assertNoResult(cq.connectQueue[0])
- d = service.stopService()
- self.successResultOf(d)
- def test_clientConnected(self):
- """
- When a client connects, the service keeps a reference to the new
- protocol and resets the delay.
- """
- clock = Clock()
- cq, service = self.makeReconnector(clock=clock)
- awaitingProtocol = service.whenConnected()
- self.assertEqual(clock.getDelayedCalls(), [])
- self.assertIdentical(self.successResultOf(awaitingProtocol),
- cq.applicationProtocols[0])
- def test_clientConnectionFailed(self):
- """
- When a client connection fails, the service removes its reference
- to the protocol and tries again after a timeout.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False,
- clock=clock)
- self.assertEqual(len(cq.connectQueue), 1)
- cq.connectQueue[0].errback(Failure(Exception()))
- whenConnected = service.whenConnected()
- self.assertNoResult(whenConnected)
- # Don't fail during test tear-down when service shutdown causes all
- # waiting connections to fail.
- whenConnected.addErrback(lambda ignored: ignored.trap(CancelledError))
- clock.advance(AT_LEAST_ONE_ATTEMPT)
- self.assertEqual(len(cq.connectQueue), 2)
- def test_clientConnectionLost(self):
- """
- When a client connection is lost, the service removes its reference
- to the protocol and calls retry.
- """
- clock = Clock()
- cq, service = self.makeReconnector(clock=clock, fireImmediately=False)
- self.assertEqual(len(cq.connectQueue), 1)
- cq.connectQueue[0].callback(None)
- self.assertEqual(len(cq.connectQueue), 1)
- self.assertIdentical(self.successResultOf(service.whenConnected()),
- cq.applicationProtocols[0])
- cq.constructedProtocols[0].connectionLost(Failure(Exception()))
- clock.advance(AT_LEAST_ONE_ATTEMPT)
- self.assertEqual(len(cq.connectQueue), 2)
- cq.connectQueue[1].callback(None)
- self.assertIdentical(self.successResultOf(service.whenConnected()),
- cq.applicationProtocols[1])
- def test_clientConnectionLostWhileStopping(self):
- """
- When a client connection is lost while the service is stopping, the
- protocol stopping deferred is called and the reference to the protocol
- is removed.
- """
- clock = Clock()
- cq, service = self.makeReconnector(clock=clock)
- d = service.stopService()
- cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
- self.failureResultOf(service.whenConnected(), CancelledError)
- self.assertTrue(d.called)
- def test_startTwice(self):
- """
- If L{ClientService} is started when it's already started, it will log a
- complaint and do nothing else (in particular it will not make
- additional connections).
- """
- cq, service = self.makeReconnector(fireImmediately=False,
- startService=False)
- self.assertEqual(len(cq.connectQueue), 0)
- service.startService()
- self.assertEqual(len(cq.connectQueue), 1)
- messages = catchLogs(self)
- service.startService()
- self.assertEqual(len(cq.connectQueue), 1)
- self.assertIn("Duplicate ClientService.startService", messages()[0])
- def test_whenConnectedLater(self):
- """
- L{ClientService.whenConnected} returns a L{Deferred} that fires when a
- connection is established.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- a = service.whenConnected()
- b = service.whenConnected()
- c = service.whenConnected(failAfterFailures=1)
- self.assertNoResult(a)
- self.assertNoResult(b)
- self.assertNoResult(c)
- cq.connectQueue[0].callback(None)
- resultA = self.successResultOf(a)
- resultB = self.successResultOf(b)
- resultC = self.successResultOf(c)
- self.assertIdentical(resultA, resultB)
- self.assertIdentical(resultA, resultC)
- self.assertIdentical(resultA, cq.applicationProtocols[0])
- def test_whenConnectedFails(self):
- """
- L{ClientService.whenConnected} returns a L{Deferred} that fails, if
- asked, when some number of connections have failed.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- a0 = service.whenConnected()
- a1 = service.whenConnected(failAfterFailures=1)
- a2 = service.whenConnected(failAfterFailures=2)
- a3 = service.whenConnected(failAfterFailures=3)
- self.assertNoResult(a0)
- self.assertNoResult(a1)
- self.assertNoResult(a2)
- self.assertNoResult(a3)
- f1 = Failure(Exception())
- cq.connectQueue[0].errback(f1)
- self.assertNoResult(a0)
- self.assertIdentical(self.failureResultOf(a1, Exception), f1)
- self.assertNoResult(a2)
- self.assertNoResult(a3)
- clock.advance(AT_LEAST_ONE_ATTEMPT)
- self.assertEqual(len(cq.connectQueue), 2)
- self.assertNoResult(a0)
- self.assertNoResult(a2)
- self.assertNoResult(a3)
- f2 = Failure(Exception())
- cq.connectQueue[1].errback(f2)
- self.assertNoResult(a0)
- self.assertIdentical(self.failureResultOf(a2, Exception), f2)
- self.assertNoResult(a3)
- AT_LEAST_TWO_ATTEMPTS = AT_LEAST_ONE_ATTEMPT # close enough
- clock.advance(AT_LEAST_TWO_ATTEMPTS)
- self.assertEqual(len(cq.connectQueue), 3)
- self.assertNoResult(a0)
- self.assertNoResult(a3)
- cq.connectQueue[2].callback(None)
- resultA0 = self.successResultOf(a0)
- resultA3 = self.successResultOf(a3)
- self.assertIdentical(resultA0, resultA3)
- self.assertIdentical(resultA0, cq.applicationProtocols[0])
- # a new whenConnected Deferred, obtained after we're connected,
- # should have fired already, even if failAfterFailures is set
- a4 = service.whenConnected(failAfterFailures=1)
- resultA4 = self.successResultOf(a4)
- self.assertIdentical(resultA0, resultA4)
- def test_whenConnectedStopService(self):
- """
- L{ClientService.whenConnected} returns a L{Deferred} that fails when
- L{ClientService.stopService} is called.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- a = service.whenConnected()
- b = service.whenConnected()
- c = service.whenConnected(failAfterFailures=1)
- self.assertNoResult(a)
- self.assertNoResult(b)
- self.assertNoResult(c)
- service.stopService()
- clock.advance(AT_LEAST_ONE_ATTEMPT)
- self.failureResultOf(a, CancelledError)
- self.failureResultOf(b, CancelledError)
- self.failureResultOf(c, CancelledError)
- def test_retryCancelled(self):
- """
- When L{ClientService.stopService} is called while waiting between
- connection attempts, the pending reconnection attempt is cancelled and
- the service is stopped immediately.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
- cq.connectQueue[0].errback(Exception("no connection"))
- d = service.stopService()
- self.assertEqual(clock.getDelayedCalls(), [])
- self.successResultOf(d)
- def test_stopServiceBeforeStartService(self):
- """
- Calling L{ClientService.stopService} before
- L{ClientService.startService} returns a L{Deferred} that has
- already fired with L{None}.
- """
- clock = Clock()
- _, service = self.makeReconnector(fireImmediately=False,
- startService=False,
- clock=clock)
- d = service.stopService()
- self.assertIsNone(self.successResultOf(d))
- def test_whenConnectedErrbacksOnStopService(self):
- """
- L{ClientService.whenConnected} returns a L{Deferred} that
- errbacks with L{CancelledError} if
- L{ClientService.stopService} is called between connection
- attempts.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False,
- clock=clock)
- beforeErrbackAndStop = service.whenConnected()
- # The protocol fails to connect, and the service is waiting to
- # reconnect.
- cq.connectQueue[0].errback(Exception("no connection"))
- service.stopService()
- afterErrbackAndStop = service.whenConnected()
- self.assertIsInstance(self.failureResultOf(beforeErrbackAndStop).value,
- CancelledError)
- self.assertIsInstance(self.failureResultOf(afterErrbackAndStop).value,
- CancelledError)
- def test_stopServiceWhileDisconnecting(self):
- """
- Calling L{ClientService.stopService} twice after it has
- connected (that is, stopping it while it is disconnecting)
- returns a L{Deferred} each time that fires when the
- disconnection has completed.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False,
- clock=clock)
- # The protocol connects
- cq.connectQueue[0].callback(None)
- # The protocol begins disconnecting
- firstStopDeferred = service.stopService()
- # The protocol continues disconnecting
- secondStopDeferred = service.stopService()
- # The protocol is disconnected
- cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
- self.successResultOf(firstStopDeferred)
- self.successResultOf(secondStopDeferred)
- def test_stopServiceWhileRestarting(self):
- """
- Calling L{ClientService.stopService} after calling a
- reconnection attempt returns a L{Deferred} that fires when the
- disconnection has completed.
- """
- clock = Clock()
- cq, service = self.makeReconnector(fireImmediately=False,
- clock=clock)
- # The protocol connects
- cq.connectQueue[0].callback(None)
- # The protocol begins disconnecting
- firstStopDeferred = service.stopService()
- # The protocol begins reconnecting
- service.startService()
- # The protocol begins disconnecting again
- secondStopDeferred = service.stopService()
- # The protocol is disconnected
- cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
- self.successResultOf(firstStopDeferred)
- self.successResultOf(secondStopDeferred)
- def test_stopServiceOnStoppedService(self):
- """
- Calling L{ClientService.stopService} on a stopped service
- returns a L{Deferred} that has already fired with L{None}.
- """
- clock = Clock()
- _, service = self.makeReconnector(fireImmediately=False,
- clock=clock)
- firstStopDeferred = service.stopService()
- secondStopDeferred = service.stopService()
- self.assertIsNone(self.successResultOf(firstStopDeferred))
- self.assertIsNone(self.successResultOf(secondStopDeferred))
|