test_internet.py 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033
  1. # -*- test-case-name: twisted.application.test.test_internet -*-
  2. # Copyright (c) Twisted Matrix Laboratories.
  3. # See LICENSE for details.
  4. """
  5. Tests for (new code in) L{twisted.application.internet}.
  6. @var AT_LEAST_ONE_ATTEMPT: At least enough seconds for L{ClientService} to make
  7. one attempt.
  8. """
  9. from __future__ import absolute_import, division
  10. import pickle
  11. from zope.interface import implementer
  12. from zope.interface.verify import verifyClass
  13. from twisted.internet.protocol import Factory, Protocol
  14. from twisted.internet.task import Clock
  15. from twisted.trial.unittest import TestCase, SynchronousTestCase
  16. from twisted.application import internet
  17. from twisted.application.internet import (
  18. StreamServerEndpointService, TimerService, ClientService)
  19. from twisted.internet.defer import Deferred, CancelledError
  20. from twisted.internet.interfaces import (
  21. IStreamServerEndpoint, IStreamClientEndpoint, IListeningPort,
  22. IHalfCloseableProtocol, IFileDescriptorReceiver
  23. )
  24. from twisted.internet import task
  25. from twisted.python.failure import Failure
  26. from twisted.logger import globalLogPublisher, formatEvent
  27. from twisted.test.proto_helpers import StringTransport
  28. def fakeTargetFunction():
  29. """
  30. A fake target function for testing TimerService which does nothing.
  31. """
  32. pass
  33. @implementer(IStreamServerEndpoint)
  34. class FakeServer(object):
  35. """
  36. In-memory implementation of L{IStreamServerEndpoint}.
  37. @ivar result: The L{Deferred} resulting from the call to C{listen}, after
  38. C{listen} has been called.
  39. @ivar factory: The factory passed to C{listen}.
  40. @ivar cancelException: The exception to errback C{self.result} when it is
  41. cancelled.
  42. @ivar port: The L{IListeningPort} which C{listen}'s L{Deferred} will fire
  43. with.
  44. @ivar listenAttempts: The number of times C{listen} has been invoked.
  45. @ivar failImmediately: If set, the exception to fail the L{Deferred}
  46. returned from C{listen} before it is returned.
  47. """
  48. result = None
  49. factory = None
  50. failImmediately = None
  51. cancelException = CancelledError()
  52. listenAttempts = 0
  53. def __init__(self):
  54. self.port = FakePort()
  55. def listen(self, factory):
  56. """
  57. Return a Deferred and store it for future use. (Implementation of
  58. L{IStreamServerEndpoint}).
  59. @param factory: the factory to listen with
  60. @return: a L{Deferred} stored in L{FakeServer.result}
  61. """
  62. self.listenAttempts += 1
  63. self.factory = factory
  64. self.result = Deferred(
  65. canceller=lambda d: d.errback(self.cancelException))
  66. if self.failImmediately is not None:
  67. self.result.errback(self.failImmediately)
  68. return self.result
  69. def startedListening(self):
  70. """
  71. Test code should invoke this method after causing C{listen} to be
  72. invoked in order to fire the L{Deferred} previously returned from
  73. C{listen}.
  74. """
  75. self.result.callback(self.port)
  76. def stoppedListening(self):
  77. """
  78. Test code should invoke this method after causing C{stopListening} to
  79. be invoked on the port fired from the L{Deferred} returned from
  80. C{listen} in order to cause the L{Deferred} returned from
  81. C{stopListening} to fire.
  82. """
  83. self.port.deferred.callback(None)
  84. verifyClass(IStreamServerEndpoint, FakeServer)
  85. @implementer(IListeningPort)
  86. class FakePort(object):
  87. """
  88. Fake L{IListeningPort} implementation.
  89. @ivar deferred: The L{Deferred} returned by C{stopListening}.
  90. """
  91. deferred = None
  92. def stopListening(self):
  93. """
  94. Stop listening.
  95. @return: a L{Deferred} stored in L{FakePort.deferred}
  96. """
  97. self.deferred = Deferred()
  98. return self.deferred
  99. verifyClass(IStreamServerEndpoint, FakeServer)
  100. class EndpointServiceTests(TestCase):
  101. """
  102. Tests for L{twisted.application.internet}.
  103. """
  104. def setUp(self):
  105. """
  106. Construct a stub server, a stub factory, and a
  107. L{StreamServerEndpointService} to test.
  108. """
  109. self.fakeServer = FakeServer()
  110. self.factory = Factory()
  111. self.svc = StreamServerEndpointService(self.fakeServer, self.factory)
  112. def test_privilegedStartService(self):
  113. """
  114. L{StreamServerEndpointService.privilegedStartService} calls its
  115. endpoint's C{listen} method with its factory.
  116. """
  117. self.svc.privilegedStartService()
  118. self.assertIdentical(self.factory, self.fakeServer.factory)
  119. def test_synchronousRaiseRaisesSynchronously(self, thunk=None):
  120. """
  121. L{StreamServerEndpointService.startService} should raise synchronously
  122. if the L{Deferred} returned by its wrapped
  123. L{IStreamServerEndpoint.listen} has already fired with an errback and
  124. the L{StreamServerEndpointService}'s C{_raiseSynchronously} flag has
  125. been set. This feature is necessary to preserve compatibility with old
  126. behavior of L{twisted.internet.strports.service}, which is to return a
  127. service which synchronously raises an exception from C{startService}
  128. (so that, among other things, twistd will not start running). However,
  129. since L{IStreamServerEndpoint.listen} may fail asynchronously, it is a
  130. bad idea to rely on this behavior.
  131. @param thunk: If specified, a callable to execute in place of
  132. C{startService}.
  133. """
  134. self.fakeServer.failImmediately = ZeroDivisionError()
  135. self.svc._raiseSynchronously = True
  136. self.assertRaises(ZeroDivisionError, thunk or self.svc.startService)
  137. def test_synchronousRaisePrivileged(self):
  138. """
  139. L{StreamServerEndpointService.privilegedStartService} should behave the
  140. same as C{startService} with respect to
  141. L{EndpointServiceTests.test_synchronousRaiseRaisesSynchronously}.
  142. """
  143. self.test_synchronousRaiseRaisesSynchronously(
  144. self.svc.privilegedStartService)
  145. def test_failReportsError(self):
  146. """
  147. L{StreamServerEndpointService.startService} and
  148. L{StreamServerEndpointService.privilegedStartService} should both log
  149. an exception when the L{Deferred} returned from their wrapped
  150. L{IStreamServerEndpoint.listen} fails.
  151. """
  152. self.svc.startService()
  153. self.fakeServer.result.errback(ZeroDivisionError())
  154. logged = self.flushLoggedErrors(ZeroDivisionError)
  155. self.assertEqual(len(logged), 1)
  156. def test_asynchronousFailReportsError(self):
  157. """
  158. L{StreamServerEndpointService.startService} and
  159. L{StreamServerEndpointService.privilegedStartService} should both log
  160. an exception when the L{Deferred} returned from their wrapped
  161. L{IStreamServerEndpoint.listen} fails asynchronously, even if
  162. C{_raiseSynchronously} is set.
  163. """
  164. self.svc._raiseSynchronously = True
  165. self.svc.startService()
  166. self.fakeServer.result.errback(ZeroDivisionError())
  167. logged = self.flushLoggedErrors(ZeroDivisionError)
  168. self.assertEqual(len(logged), 1)
  169. def test_synchronousFailReportsError(self):
  170. """
  171. Without the C{_raiseSynchronously} compatibility flag, failing
  172. immediately has the same behavior as failing later; it logs the error.
  173. """
  174. self.fakeServer.failImmediately = ZeroDivisionError()
  175. self.svc.startService()
  176. logged = self.flushLoggedErrors(ZeroDivisionError)
  177. self.assertEqual(len(logged), 1)
  178. def test_startServiceUnstarted(self):
  179. """
  180. L{StreamServerEndpointService.startService} sets the C{running} flag,
  181. and calls its endpoint's C{listen} method with its factory, if it
  182. has not yet been started.
  183. """
  184. self.svc.startService()
  185. self.assertIdentical(self.factory, self.fakeServer.factory)
  186. self.assertEqual(self.svc.running, True)
  187. def test_startServiceStarted(self):
  188. """
  189. L{StreamServerEndpointService.startService} sets the C{running} flag,
  190. but nothing else, if the service has already been started.
  191. """
  192. self.test_privilegedStartService()
  193. self.svc.startService()
  194. self.assertEqual(self.fakeServer.listenAttempts, 1)
  195. self.assertEqual(self.svc.running, True)
  196. def test_stopService(self):
  197. """
  198. L{StreamServerEndpointService.stopService} calls C{stopListening} on
  199. the L{IListeningPort} returned from its endpoint, returns the
  200. C{Deferred} from stopService, and sets C{running} to C{False}.
  201. """
  202. self.svc.privilegedStartService()
  203. self.fakeServer.startedListening()
  204. # Ensure running gets set to true
  205. self.svc.startService()
  206. result = self.svc.stopService()
  207. l = []
  208. result.addCallback(l.append)
  209. self.assertEqual(len(l), 0)
  210. self.fakeServer.stoppedListening()
  211. self.assertEqual(len(l), 1)
  212. self.assertFalse(self.svc.running)
  213. def test_stopServiceBeforeStartFinished(self):
  214. """
  215. L{StreamServerEndpointService.stopService} cancels the L{Deferred}
  216. returned by C{listen} if it has not yet fired. No error will be logged
  217. about the cancellation of the listen attempt.
  218. """
  219. self.svc.privilegedStartService()
  220. result = self.svc.stopService()
  221. l = []
  222. result.addBoth(l.append)
  223. self.assertEqual(l, [None])
  224. self.assertEqual(self.flushLoggedErrors(CancelledError), [])
  225. def test_stopServiceCancelStartError(self):
  226. """
  227. L{StreamServerEndpointService.stopService} cancels the L{Deferred}
  228. returned by C{listen} if it has not fired yet. An error will be logged
  229. if the resulting exception is not L{CancelledError}.
  230. """
  231. self.fakeServer.cancelException = ZeroDivisionError()
  232. self.svc.privilegedStartService()
  233. result = self.svc.stopService()
  234. l = []
  235. result.addCallback(l.append)
  236. self.assertEqual(l, [None])
  237. stoppingErrors = self.flushLoggedErrors(ZeroDivisionError)
  238. self.assertEqual(len(stoppingErrors), 1)
  239. class TimerServiceTests(TestCase):
  240. """
  241. Tests for L{twisted.application.internet.TimerService}.
  242. @type timer: L{TimerService}
  243. @ivar timer: service to test
  244. @type clock: L{task.Clock}
  245. @ivar clock: source of time
  246. @type deferred: L{Deferred}
  247. @ivar deferred: deferred returned by L{TimerServiceTests.call}.
  248. """
  249. def setUp(self):
  250. """
  251. Set up a timer service to test.
  252. """
  253. self.timer = TimerService(2, self.call)
  254. self.clock = self.timer.clock = task.Clock()
  255. self.deferred = Deferred()
  256. def call(self):
  257. """
  258. Function called by L{TimerService} being tested.
  259. @returns: C{self.deferred}
  260. @rtype: L{Deferred}
  261. """
  262. return self.deferred
  263. def test_startService(self):
  264. """
  265. When L{TimerService.startService} is called, it marks itself
  266. as running, creates a L{task.LoopingCall} and starts it.
  267. """
  268. self.timer.startService()
  269. self.assertTrue(self.timer.running, "Service is started")
  270. self.assertIsInstance(self.timer._loop, task.LoopingCall)
  271. self.assertIdentical(self.clock, self.timer._loop.clock)
  272. self.assertTrue(self.timer._loop.running, "LoopingCall is started")
  273. def test_startServiceRunsCallImmediately(self):
  274. """
  275. When L{TimerService.startService} is called, it calls the function
  276. immediately.
  277. """
  278. result = []
  279. self.timer.call = (result.append, (None,), {})
  280. self.timer.startService()
  281. self.assertEqual([None], result)
  282. def test_startServiceUsesGlobalReactor(self):
  283. """
  284. L{TimerService.startService} uses L{internet._maybeGlobalReactor} to
  285. choose the reactor to pass to L{task.LoopingCall}
  286. uses the global reactor.
  287. """
  288. otherClock = task.Clock()
  289. def getOtherClock(maybeReactor):
  290. return otherClock
  291. self.patch(internet, "_maybeGlobalReactor", getOtherClock)
  292. self.timer.startService()
  293. self.assertIdentical(otherClock, self.timer._loop.clock)
  294. def test_stopServiceWaits(self):
  295. """
  296. When L{TimerService.stopService} is called while a call is in progress.
  297. the L{Deferred} returned doesn't fire until after the call finishes.
  298. """
  299. self.timer.startService()
  300. d = self.timer.stopService()
  301. self.assertNoResult(d)
  302. self.assertEqual(True, self.timer.running)
  303. self.deferred.callback(object())
  304. self.assertIdentical(self.successResultOf(d), None)
  305. def test_stopServiceImmediately(self):
  306. """
  307. When L{TimerService.stopService} is called while a call isn't in progress.
  308. the L{Deferred} returned has already been fired.
  309. """
  310. self.timer.startService()
  311. self.deferred.callback(object())
  312. d = self.timer.stopService()
  313. self.assertIdentical(self.successResultOf(d), None)
  314. def test_failedCallLogsError(self):
  315. """
  316. When function passed to L{TimerService} returns a deferred that errbacks,
  317. the exception is logged, and L{TimerService.stopService} doesn't raise an error.
  318. """
  319. self.timer.startService()
  320. self.deferred.errback(Failure(ZeroDivisionError()))
  321. errors = self.flushLoggedErrors(ZeroDivisionError)
  322. self.assertEqual(1, len(errors))
  323. d = self.timer.stopService()
  324. self.assertIdentical(self.successResultOf(d), None)
  325. def test_pickleTimerServiceNotPickleLoop(self):
  326. """
  327. When pickling L{internet.TimerService}, it won't pickle
  328. L{internet.TimerService._loop}.
  329. """
  330. # We need a pickleable callable to test pickling TimerService. So we
  331. # can't use self.timer
  332. timer = TimerService(1, fakeTargetFunction)
  333. timer.startService()
  334. dumpedTimer = pickle.dumps(timer)
  335. timer.stopService()
  336. loadedTimer = pickle.loads(dumpedTimer)
  337. nothing = object()
  338. value = getattr(loadedTimer, "_loop", nothing)
  339. self.assertIdentical(nothing, value)
  340. def test_pickleTimerServiceNotPickleLoopFinished(self):
  341. """
  342. When pickling L{internet.TimerService}, it won't pickle
  343. L{internet.TimerService._loopFinished}.
  344. """
  345. # We need a pickleable callable to test pickling TimerService. So we
  346. # can't use self.timer
  347. timer = TimerService(1, fakeTargetFunction)
  348. timer.startService()
  349. dumpedTimer = pickle.dumps(timer)
  350. timer.stopService()
  351. loadedTimer = pickle.loads(dumpedTimer)
  352. nothing = object()
  353. value = getattr(loadedTimer, "_loopFinished", nothing)
  354. self.assertIdentical(nothing, value)
  355. class ConnectInformation(object):
  356. """
  357. Information about C{endpointForTesting}
  358. @ivar connectQueue: a L{list} of L{Deferred} returned from C{connect}. If
  359. these are not already fired, you can fire them with no value and they
  360. will trigger building a factory.
  361. @ivar constructedProtocols: a L{list} of protocols constructed.
  362. @ivar passedFactories: a L{list} of L{IProtocolFactory}; the ones actually
  363. passed to the underlying endpoint / i.e. the reactor.
  364. """
  365. def __init__(self):
  366. self.connectQueue = []
  367. self.constructedProtocols = []
  368. self.passedFactories = []
  369. def endpointForTesting(fireImmediately=False):
  370. """
  371. Make a sample endpoint for testing.
  372. @param fireImmediately: If true, fire all L{Deferred}s returned from
  373. C{connect} immedaitely.
  374. @return: a 2-tuple of C{(information, endpoint)}, where C{information} is a
  375. L{ConnectInformation} describing the operations in progress on
  376. C{endpoint}.
  377. """
  378. @implementer(IStreamClientEndpoint)
  379. class ClientTestEndpoint(object):
  380. def connect(self, factory):
  381. result = Deferred()
  382. info.passedFactories.append(factory)
  383. @result.addCallback
  384. def createProtocol(ignored):
  385. protocol = factory.buildProtocol(None)
  386. info.constructedProtocols.append(protocol)
  387. transport = StringTransport()
  388. protocol.makeConnection(transport)
  389. return protocol
  390. info.connectQueue.append(result)
  391. if fireImmediately:
  392. result.callback(None)
  393. return result
  394. info = ConnectInformation()
  395. return info, ClientTestEndpoint()
  396. def catchLogs(testCase, logPublisher=globalLogPublisher):
  397. """
  398. Catch the global log stream.
  399. @param testCase: The test case to add a cleanup to.
  400. @param logPublisher: the log publisher to add and remove observers for.
  401. @return: a 0-argument callable that returns a list of textual log messages
  402. for comparison.
  403. @rtype: L{list} of L{unicode}
  404. """
  405. logs = []
  406. logPublisher.addObserver(logs.append)
  407. testCase.addCleanup(lambda: logPublisher.removeObserver(logs.append))
  408. return lambda: [formatEvent(event) for event in logs]
  409. AT_LEAST_ONE_ATTEMPT = 100.
  410. class ClientServiceTests(SynchronousTestCase):
  411. """
  412. Tests for L{ClientService}.
  413. """
  414. def makeReconnector(self, fireImmediately=True, startService=True,
  415. protocolType=Protocol, **kw):
  416. """
  417. Create a L{ClientService} along with a L{ConnectInformation} indicating
  418. the connections in progress on its endpoint.
  419. @param fireImmediately: Should all of the endpoint connection attempts
  420. fire synchronously?
  421. @type fireImmediately: L{bool}
  422. @param startService: Should the L{ClientService} be started before
  423. being returned?
  424. @type startService: L{bool}
  425. @param protocolType: a 0-argument callable returning a new L{IProtocol}
  426. provider to be used for application-level protocol connections.
  427. @param kw: Arbitrary keyword arguments to be passed on to
  428. L{ClientService}
  429. @return: a 2-tuple of L{ConnectInformation} (for information about test
  430. state) and L{ClientService} (the system under test). The
  431. L{ConnectInformation} has 2 additional attributes;
  432. C{applicationFactory} and C{applicationProtocols}, which refer to
  433. the unwrapped protocol factory and protocol instances passed in to
  434. L{ClientService} respectively.
  435. """
  436. nkw = {}
  437. nkw.update(clock=Clock())
  438. nkw.update(kw)
  439. clock = nkw['clock']
  440. cq, endpoint = endpointForTesting(fireImmediately=fireImmediately)
  441. # `endpointForTesting` is totally generic to any LLPI client that uses
  442. # endpoints, and maintains all its state internally; however,
  443. # applicationProtocols and applicationFactory are bonus attributes that
  444. # are only specifically interesitng to tests that use wrapper
  445. # protocols. For now, set them here, externally.
  446. applicationProtocols = cq.applicationProtocols = []
  447. class RememberingFactory(Factory, object):
  448. protocol = protocolType
  449. def buildProtocol(self, addr):
  450. result = super(RememberingFactory, self).buildProtocol(addr)
  451. applicationProtocols.append(result)
  452. return result
  453. cq.applicationFactory = factory = RememberingFactory()
  454. service = ClientService(endpoint, factory, **nkw)
  455. def stop():
  456. service._protocol = None
  457. if service.running:
  458. service.stopService()
  459. # Ensure that we don't leave any state in the reactor after
  460. # stopService.
  461. self.assertEqual(clock.getDelayedCalls(), [])
  462. self.addCleanup(stop)
  463. if startService:
  464. service.startService()
  465. return cq, service
  466. def test_startService(self):
  467. """
  468. When the service is started, a connection attempt is made.
  469. """
  470. cq, service = self.makeReconnector(fireImmediately=False)
  471. self.assertEqual(len(cq.connectQueue), 1)
  472. def test_startStopFactory(self):
  473. """
  474. Although somewhat obscure, L{IProtocolFactory} includes both C{doStart}
  475. and C{doStop} methods; ensure that when these methods are called on the
  476. factory that was passed to the reactor, the factory that was passed
  477. from the application receives them.
  478. """
  479. cq, service = self.makeReconnector()
  480. firstAppFactory = cq.applicationFactory
  481. self.assertEqual(firstAppFactory.numPorts, 0)
  482. firstPassedFactory = cq.passedFactories[0]
  483. firstPassedFactory.doStart()
  484. self.assertEqual(firstAppFactory.numPorts, 1)
  485. def test_stopServiceWhileConnected(self):
  486. """
  487. When the service is stopped, no further connect attempts are made. The
  488. returned L{Deferred} fires when all outstanding connections have been
  489. stopped.
  490. """
  491. cq, service = self.makeReconnector()
  492. d = service.stopService()
  493. self.assertNoResult(d)
  494. protocol = cq.constructedProtocols[0]
  495. self.assertEqual(protocol.transport.disconnecting, True)
  496. protocol.connectionLost(Failure(Exception()))
  497. self.successResultOf(d)
  498. def test_startServiceWaitsForDisconnect(self):
  499. """
  500. When L{ClientService} is restarted after having been connected, it
  501. waits to start connecting until after having disconnected.
  502. """
  503. cq, service = self.makeReconnector()
  504. d = service.stopService()
  505. self.assertNoResult(d)
  506. protocol = cq.constructedProtocols[0]
  507. self.assertEqual(protocol.transport.disconnecting, True)
  508. service.startService()
  509. self.assertNoResult(d)
  510. self.assertEqual(len(cq.constructedProtocols), 1)
  511. protocol.connectionLost(Failure(Exception()))
  512. self.assertEqual(len(cq.constructedProtocols), 2)
  513. def test_startServiceWhileStopping(self):
  514. """
  515. When L{ClientService} is stopping - that is,
  516. L{ClientService.stopService} has been called, but the L{Deferred} it
  517. returns has not fired yet - calling L{startService} will cause a new
  518. connection to be made, and new calls to L{whenConnected} to succeed.
  519. """
  520. cq, service = self.makeReconnector(fireImmediately=False)
  521. cq.connectQueue[0].callback(None)
  522. first = cq.constructedProtocols[0]
  523. stopped = service.stopService()
  524. self.assertNoResult(stopped)
  525. nextProtocol = service.whenConnected()
  526. self.assertNoResult(nextProtocol)
  527. service.startService()
  528. self.assertNoResult(nextProtocol)
  529. self.assertNoResult(stopped)
  530. self.assertEqual(first.transport.disconnecting, True)
  531. first.connectionLost(Failure(Exception()))
  532. self.successResultOf(stopped)
  533. cq.connectQueue[1].callback(None)
  534. self.assertEqual(len(cq.constructedProtocols), 2)
  535. self.assertIdentical(self.successResultOf(nextProtocol),
  536. cq.applicationProtocols[1])
  537. secondStopped = service.stopService()
  538. self.assertNoResult(secondStopped)
  539. def test_startServiceWhileStopped(self):
  540. """
  541. When L{ClientService} is stopped - that is,
  542. L{ClientService.stopService} has been called and the L{Deferred} it
  543. returns has fired - calling L{startService} will cause a new connection
  544. to be made, and new calls to L{whenConnected} to succeed.
  545. """
  546. cq, service = self.makeReconnector(fireImmediately=False)
  547. stopped = service.stopService()
  548. self.successResultOf(stopped)
  549. self.failureResultOf(service.whenConnected(), CancelledError)
  550. service.startService()
  551. cq.connectQueue[-1].callback(None)
  552. self.assertIdentical(cq.applicationProtocols[-1],
  553. self.successResultOf(service.whenConnected()))
  554. def test_interfacesForTransport(self):
  555. """
  556. If the protocol objects returned by the factory given to
  557. L{ClientService} provide special "marker" interfaces for their
  558. transport - L{IHalfCloseableProtocol} or L{IFileDescriptorReceiver} -
  559. those interfaces will be provided by the protocol objects passed on to
  560. the reactor.
  561. """
  562. @implementer(IHalfCloseableProtocol, IFileDescriptorReceiver)
  563. class FancyProtocol(Protocol, object):
  564. """
  565. Provider of various interfaces.
  566. """
  567. cq, service = self.makeReconnector(protocolType=FancyProtocol)
  568. reactorFacing = cq.constructedProtocols[0]
  569. self.assertTrue(IFileDescriptorReceiver.providedBy(reactorFacing))
  570. self.assertTrue(IHalfCloseableProtocol.providedBy(reactorFacing))
  571. def test_stopServiceWhileRetrying(self):
  572. """
  573. When the service is stopped while retrying, the retry is cancelled.
  574. """
  575. clock = Clock()
  576. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  577. cq.connectQueue[0].errback(Exception())
  578. clock.advance(AT_LEAST_ONE_ATTEMPT)
  579. self.assertEqual(len(cq.connectQueue), 2)
  580. d = service.stopService()
  581. cq.connectQueue[1].errback(Exception())
  582. self.successResultOf(d)
  583. def test_stopServiceWhileConnecting(self):
  584. """
  585. When the service is stopped while initially connecting, the connection
  586. attempt is cancelled.
  587. """
  588. clock = Clock()
  589. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  590. self.assertEqual(len(cq.connectQueue), 1)
  591. self.assertNoResult(cq.connectQueue[0])
  592. d = service.stopService()
  593. self.successResultOf(d)
  594. def test_clientConnected(self):
  595. """
  596. When a client connects, the service keeps a reference to the new
  597. protocol and resets the delay.
  598. """
  599. clock = Clock()
  600. cq, service = self.makeReconnector(clock=clock)
  601. awaitingProtocol = service.whenConnected()
  602. self.assertEqual(clock.getDelayedCalls(), [])
  603. self.assertIdentical(self.successResultOf(awaitingProtocol),
  604. cq.applicationProtocols[0])
  605. def test_clientConnectionFailed(self):
  606. """
  607. When a client connection fails, the service removes its reference
  608. to the protocol and tries again after a timeout.
  609. """
  610. clock = Clock()
  611. cq, service = self.makeReconnector(fireImmediately=False,
  612. clock=clock)
  613. self.assertEqual(len(cq.connectQueue), 1)
  614. cq.connectQueue[0].errback(Failure(Exception()))
  615. whenConnected = service.whenConnected()
  616. self.assertNoResult(whenConnected)
  617. # Don't fail during test tear-down when service shutdown causes all
  618. # waiting connections to fail.
  619. whenConnected.addErrback(lambda ignored: ignored.trap(CancelledError))
  620. clock.advance(AT_LEAST_ONE_ATTEMPT)
  621. self.assertEqual(len(cq.connectQueue), 2)
  622. def test_clientConnectionLost(self):
  623. """
  624. When a client connection is lost, the service removes its reference
  625. to the protocol and calls retry.
  626. """
  627. clock = Clock()
  628. cq, service = self.makeReconnector(clock=clock, fireImmediately=False)
  629. self.assertEqual(len(cq.connectQueue), 1)
  630. cq.connectQueue[0].callback(None)
  631. self.assertEqual(len(cq.connectQueue), 1)
  632. self.assertIdentical(self.successResultOf(service.whenConnected()),
  633. cq.applicationProtocols[0])
  634. cq.constructedProtocols[0].connectionLost(Failure(Exception()))
  635. clock.advance(AT_LEAST_ONE_ATTEMPT)
  636. self.assertEqual(len(cq.connectQueue), 2)
  637. cq.connectQueue[1].callback(None)
  638. self.assertIdentical(self.successResultOf(service.whenConnected()),
  639. cq.applicationProtocols[1])
  640. def test_clientConnectionLostWhileStopping(self):
  641. """
  642. When a client connection is lost while the service is stopping, the
  643. protocol stopping deferred is called and the reference to the protocol
  644. is removed.
  645. """
  646. clock = Clock()
  647. cq, service = self.makeReconnector(clock=clock)
  648. d = service.stopService()
  649. cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
  650. self.failureResultOf(service.whenConnected(), CancelledError)
  651. self.assertTrue(d.called)
  652. def test_startTwice(self):
  653. """
  654. If L{ClientService} is started when it's already started, it will log a
  655. complaint and do nothing else (in particular it will not make
  656. additional connections).
  657. """
  658. cq, service = self.makeReconnector(fireImmediately=False,
  659. startService=False)
  660. self.assertEqual(len(cq.connectQueue), 0)
  661. service.startService()
  662. self.assertEqual(len(cq.connectQueue), 1)
  663. messages = catchLogs(self)
  664. service.startService()
  665. self.assertEqual(len(cq.connectQueue), 1)
  666. self.assertIn("Duplicate ClientService.startService", messages()[0])
  667. def test_whenConnectedLater(self):
  668. """
  669. L{ClientService.whenConnected} returns a L{Deferred} that fires when a
  670. connection is established.
  671. """
  672. clock = Clock()
  673. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  674. a = service.whenConnected()
  675. b = service.whenConnected()
  676. c = service.whenConnected(failAfterFailures=1)
  677. self.assertNoResult(a)
  678. self.assertNoResult(b)
  679. self.assertNoResult(c)
  680. cq.connectQueue[0].callback(None)
  681. resultA = self.successResultOf(a)
  682. resultB = self.successResultOf(b)
  683. resultC = self.successResultOf(c)
  684. self.assertIdentical(resultA, resultB)
  685. self.assertIdentical(resultA, resultC)
  686. self.assertIdentical(resultA, cq.applicationProtocols[0])
  687. def test_whenConnectedFails(self):
  688. """
  689. L{ClientService.whenConnected} returns a L{Deferred} that fails, if
  690. asked, when some number of connections have failed.
  691. """
  692. clock = Clock()
  693. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  694. a0 = service.whenConnected()
  695. a1 = service.whenConnected(failAfterFailures=1)
  696. a2 = service.whenConnected(failAfterFailures=2)
  697. a3 = service.whenConnected(failAfterFailures=3)
  698. self.assertNoResult(a0)
  699. self.assertNoResult(a1)
  700. self.assertNoResult(a2)
  701. self.assertNoResult(a3)
  702. f1 = Failure(Exception())
  703. cq.connectQueue[0].errback(f1)
  704. self.assertNoResult(a0)
  705. self.assertIdentical(self.failureResultOf(a1, Exception), f1)
  706. self.assertNoResult(a2)
  707. self.assertNoResult(a3)
  708. clock.advance(AT_LEAST_ONE_ATTEMPT)
  709. self.assertEqual(len(cq.connectQueue), 2)
  710. self.assertNoResult(a0)
  711. self.assertNoResult(a2)
  712. self.assertNoResult(a3)
  713. f2 = Failure(Exception())
  714. cq.connectQueue[1].errback(f2)
  715. self.assertNoResult(a0)
  716. self.assertIdentical(self.failureResultOf(a2, Exception), f2)
  717. self.assertNoResult(a3)
  718. AT_LEAST_TWO_ATTEMPTS = AT_LEAST_ONE_ATTEMPT # close enough
  719. clock.advance(AT_LEAST_TWO_ATTEMPTS)
  720. self.assertEqual(len(cq.connectQueue), 3)
  721. self.assertNoResult(a0)
  722. self.assertNoResult(a3)
  723. cq.connectQueue[2].callback(None)
  724. resultA0 = self.successResultOf(a0)
  725. resultA3 = self.successResultOf(a3)
  726. self.assertIdentical(resultA0, resultA3)
  727. self.assertIdentical(resultA0, cq.applicationProtocols[0])
  728. # a new whenConnected Deferred, obtained after we're connected,
  729. # should have fired already, even if failAfterFailures is set
  730. a4 = service.whenConnected(failAfterFailures=1)
  731. resultA4 = self.successResultOf(a4)
  732. self.assertIdentical(resultA0, resultA4)
  733. def test_whenConnectedStopService(self):
  734. """
  735. L{ClientService.whenConnected} returns a L{Deferred} that fails when
  736. L{ClientService.stopService} is called.
  737. """
  738. clock = Clock()
  739. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  740. a = service.whenConnected()
  741. b = service.whenConnected()
  742. c = service.whenConnected(failAfterFailures=1)
  743. self.assertNoResult(a)
  744. self.assertNoResult(b)
  745. self.assertNoResult(c)
  746. service.stopService()
  747. clock.advance(AT_LEAST_ONE_ATTEMPT)
  748. self.failureResultOf(a, CancelledError)
  749. self.failureResultOf(b, CancelledError)
  750. self.failureResultOf(c, CancelledError)
  751. def test_retryCancelled(self):
  752. """
  753. When L{ClientService.stopService} is called while waiting between
  754. connection attempts, the pending reconnection attempt is cancelled and
  755. the service is stopped immediately.
  756. """
  757. clock = Clock()
  758. cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
  759. cq.connectQueue[0].errback(Exception("no connection"))
  760. d = service.stopService()
  761. self.assertEqual(clock.getDelayedCalls(), [])
  762. self.successResultOf(d)
  763. def test_stopServiceBeforeStartService(self):
  764. """
  765. Calling L{ClientService.stopService} before
  766. L{ClientService.startService} returns a L{Deferred} that has
  767. already fired with L{None}.
  768. """
  769. clock = Clock()
  770. _, service = self.makeReconnector(fireImmediately=False,
  771. startService=False,
  772. clock=clock)
  773. d = service.stopService()
  774. self.assertIsNone(self.successResultOf(d))
  775. def test_whenConnectedErrbacksOnStopService(self):
  776. """
  777. L{ClientService.whenConnected} returns a L{Deferred} that
  778. errbacks with L{CancelledError} if
  779. L{ClientService.stopService} is called between connection
  780. attempts.
  781. """
  782. clock = Clock()
  783. cq, service = self.makeReconnector(fireImmediately=False,
  784. clock=clock)
  785. beforeErrbackAndStop = service.whenConnected()
  786. # The protocol fails to connect, and the service is waiting to
  787. # reconnect.
  788. cq.connectQueue[0].errback(Exception("no connection"))
  789. service.stopService()
  790. afterErrbackAndStop = service.whenConnected()
  791. self.assertIsInstance(self.failureResultOf(beforeErrbackAndStop).value,
  792. CancelledError)
  793. self.assertIsInstance(self.failureResultOf(afterErrbackAndStop).value,
  794. CancelledError)
  795. def test_stopServiceWhileDisconnecting(self):
  796. """
  797. Calling L{ClientService.stopService} twice after it has
  798. connected (that is, stopping it while it is disconnecting)
  799. returns a L{Deferred} each time that fires when the
  800. disconnection has completed.
  801. """
  802. clock = Clock()
  803. cq, service = self.makeReconnector(fireImmediately=False,
  804. clock=clock)
  805. # The protocol connects
  806. cq.connectQueue[0].callback(None)
  807. # The protocol begins disconnecting
  808. firstStopDeferred = service.stopService()
  809. # The protocol continues disconnecting
  810. secondStopDeferred = service.stopService()
  811. # The protocol is disconnected
  812. cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
  813. self.successResultOf(firstStopDeferred)
  814. self.successResultOf(secondStopDeferred)
  815. def test_stopServiceWhileRestarting(self):
  816. """
  817. Calling L{ClientService.stopService} after calling a
  818. reconnection attempt returns a L{Deferred} that fires when the
  819. disconnection has completed.
  820. """
  821. clock = Clock()
  822. cq, service = self.makeReconnector(fireImmediately=False,
  823. clock=clock)
  824. # The protocol connects
  825. cq.connectQueue[0].callback(None)
  826. # The protocol begins disconnecting
  827. firstStopDeferred = service.stopService()
  828. # The protocol begins reconnecting
  829. service.startService()
  830. # The protocol begins disconnecting again
  831. secondStopDeferred = service.stopService()
  832. # The protocol is disconnected
  833. cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))
  834. self.successResultOf(firstStopDeferred)
  835. self.successResultOf(secondStopDeferred)
  836. def test_stopServiceOnStoppedService(self):
  837. """
  838. Calling L{ClientService.stopService} on a stopped service
  839. returns a L{Deferred} that has already fired with L{None}.
  840. """
  841. clock = Clock()
  842. _, service = self.makeReconnector(fireImmediately=False,
  843. clock=clock)
  844. firstStopDeferred = service.stopService()
  845. secondStopDeferred = service.stopService()
  846. self.assertIsNone(self.successResultOf(firstStopDeferred))
  847. self.assertIsNone(self.successResultOf(secondStopDeferred))