test_internet.py 45 KB


  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Tests for lots of functionality provided by L{twisted.internet}.
  5. """
  6. from __future__ import division, absolute_import
  7. import os
  8. import sys
  9. import time
  10. from twisted.python.compat import _PY3
  11. from twisted.trial import unittest
  12. from twisted.internet import reactor, protocol, error, abstract, defer
  13. from twisted.internet import interfaces, base
  14. try:
  15. from twisted.internet import ssl
  16. except ImportError:
  17. ssl = None
  18. if ssl and not ssl.supported:
  19. ssl = None
  20. from twisted.internet.defer import Deferred, passthru
  21. if not _PY3:
  22. from twisted.python import util
  23. class ThreePhaseEventTests(unittest.TestCase):
  24. """
  25. Tests for the private implementation helpers for system event triggers.
  26. """
  27. def setUp(self):
  28. """
  29. Create a trigger, an argument, and an event to be used by tests.
  30. """
  31. self.trigger = lambda x: None
  32. self.arg = object()
  33. self.event = base._ThreePhaseEvent()
  34. def test_addInvalidPhase(self):
  35. """
  36. L{_ThreePhaseEvent.addTrigger} should raise L{KeyError} when called
  37. with an invalid phase.
  38. """
  39. self.assertRaises(
  40. KeyError,
  41. self.event.addTrigger, 'xxx', self.trigger, self.arg)
  42. def test_addBeforeTrigger(self):
  43. """
  44. L{_ThreePhaseEvent.addTrigger} should accept C{'before'} as a phase, a
  45. callable, and some arguments and add the callable with the arguments to
  46. the before list.
  47. """
  48. self.event.addTrigger('before', self.trigger, self.arg)
  49. self.assertEqual(
  50. self.event.before,
  51. [(self.trigger, (self.arg,), {})])
  52. def test_addDuringTrigger(self):
  53. """
  54. L{_ThreePhaseEvent.addTrigger} should accept C{'during'} as a phase, a
  55. callable, and some arguments and add the callable with the arguments to
  56. the during list.
  57. """
  58. self.event.addTrigger('during', self.trigger, self.arg)
  59. self.assertEqual(
  60. self.event.during,
  61. [(self.trigger, (self.arg,), {})])
  62. def test_addAfterTrigger(self):
  63. """
  64. L{_ThreePhaseEvent.addTrigger} should accept C{'after'} as a phase, a
  65. callable, and some arguments and add the callable with the arguments to
  66. the after list.
  67. """
  68. self.event.addTrigger('after', self.trigger, self.arg)
  69. self.assertEqual(
  70. self.event.after,
  71. [(self.trigger, (self.arg,), {})])
  72. def test_removeTrigger(self):
  73. """
  74. L{_ThreePhaseEvent.removeTrigger} should accept an opaque object
  75. previously returned by L{_ThreePhaseEvent.addTrigger} and remove the
  76. associated trigger.
  77. """
  78. handle = self.event.addTrigger('before', self.trigger, self.arg)
  79. self.event.removeTrigger(handle)
  80. self.assertEqual(self.event.before, [])
  81. def test_removeNonexistentTrigger(self):
  82. """
  83. L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} when given
  84. an object not previously returned by L{_ThreePhaseEvent.addTrigger}.
  85. """
  86. self.assertRaises(ValueError, self.event.removeTrigger, object())
  87. def test_removeRemovedTrigger(self):
  88. """
  89. L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} the second
  90. time it is called with an object returned by
  91. L{_ThreePhaseEvent.addTrigger}.
  92. """
  93. handle = self.event.addTrigger('before', self.trigger, self.arg)
  94. self.event.removeTrigger(handle)
  95. self.assertRaises(ValueError, self.event.removeTrigger, handle)
  96. def test_removeAlmostValidTrigger(self):
  97. """
  98. L{_ThreePhaseEvent.removeTrigger} should raise L{ValueError} if it is
  99. given a trigger handle which resembles a valid trigger handle aside
  100. from its phase being incorrect.
  101. """
  102. self.assertRaises(
  103. KeyError,
  104. self.event.removeTrigger, ('xxx', self.trigger, (self.arg,), {}))
  105. def test_fireEvent(self):
  106. """
  107. L{_ThreePhaseEvent.fireEvent} should call I{before}, I{during}, and
  108. I{after} phase triggers in that order.
  109. """
  110. events = []
  111. self.event.addTrigger('after', events.append, ('first', 'after'))
  112. self.event.addTrigger('during', events.append, ('first', 'during'))
  113. self.event.addTrigger('before', events.append, ('first', 'before'))
  114. self.event.addTrigger('before', events.append, ('second', 'before'))
  115. self.event.addTrigger('during', events.append, ('second', 'during'))
  116. self.event.addTrigger('after', events.append, ('second', 'after'))
  117. self.assertEqual(events, [])
  118. self.event.fireEvent()
  119. self.assertEqual(events,
  120. [('first', 'before'), ('second', 'before'),
  121. ('first', 'during'), ('second', 'during'),
  122. ('first', 'after'), ('second', 'after')])
  123. def test_asynchronousBefore(self):
  124. """
  125. L{_ThreePhaseEvent.fireEvent} should wait for any L{Deferred} returned
  126. by a I{before} phase trigger before proceeding to I{during} events.
  127. """
  128. events = []
  129. beforeResult = Deferred()
  130. self.event.addTrigger('before', lambda: beforeResult)
  131. self.event.addTrigger('during', events.append, 'during')
  132. self.event.addTrigger('after', events.append, 'after')
  133. self.assertEqual(events, [])
  134. self.event.fireEvent()
  135. self.assertEqual(events, [])
  136. beforeResult.callback(None)
  137. self.assertEqual(events, ['during', 'after'])
  138. def test_beforeTriggerException(self):
  139. """
  140. If a before-phase trigger raises a synchronous exception, it should be
  141. logged and the remaining triggers should be run.
  142. """
  143. events = []
  144. class DummyException(Exception):
  145. pass
  146. def raisingTrigger():
  147. raise DummyException()
  148. self.event.addTrigger('before', raisingTrigger)
  149. self.event.addTrigger('before', events.append, 'before')
  150. self.event.addTrigger('during', events.append, 'during')
  151. self.event.fireEvent()
  152. self.assertEqual(events, ['before', 'during'])
  153. errors = self.flushLoggedErrors(DummyException)
  154. self.assertEqual(len(errors), 1)
  155. def test_duringTriggerException(self):
  156. """
  157. If a during-phase trigger raises a synchronous exception, it should be
  158. logged and the remaining triggers should be run.
  159. """
  160. events = []
  161. class DummyException(Exception):
  162. pass
  163. def raisingTrigger():
  164. raise DummyException()
  165. self.event.addTrigger('during', raisingTrigger)
  166. self.event.addTrigger('during', events.append, 'during')
  167. self.event.addTrigger('after', events.append, 'after')
  168. self.event.fireEvent()
  169. self.assertEqual(events, ['during', 'after'])
  170. errors = self.flushLoggedErrors(DummyException)
  171. self.assertEqual(len(errors), 1)
  172. def test_synchronousRemoveAlreadyExecutedBefore(self):
  173. """
  174. If a before-phase trigger tries to remove another before-phase trigger
  175. which has already run, a warning should be emitted.
  176. """
  177. events = []
  178. def removeTrigger():
  179. self.event.removeTrigger(beforeHandle)
  180. beforeHandle = self.event.addTrigger('before', events.append, ('first', 'before'))
  181. self.event.addTrigger('before', removeTrigger)
  182. self.event.addTrigger('before', events.append, ('second', 'before'))
  183. self.assertWarns(
  184. DeprecationWarning,
  185. "Removing already-fired system event triggers will raise an "
  186. "exception in a future version of Twisted.",
  187. __file__,
  188. self.event.fireEvent)
  189. self.assertEqual(events, [('first', 'before'), ('second', 'before')])
  190. def test_synchronousRemovePendingBefore(self):
  191. """
  192. If a before-phase trigger removes another before-phase trigger which
  193. has not yet run, the removed trigger should not be run.
  194. """
  195. events = []
  196. self.event.addTrigger(
  197. 'before', lambda: self.event.removeTrigger(beforeHandle))
  198. beforeHandle = self.event.addTrigger(
  199. 'before', events.append, ('first', 'before'))
  200. self.event.addTrigger('before', events.append, ('second', 'before'))
  201. self.event.fireEvent()
  202. self.assertEqual(events, [('second', 'before')])
  203. def test_synchronousBeforeRemovesDuring(self):
  204. """
  205. If a before-phase trigger removes a during-phase trigger, the
  206. during-phase trigger should not be run.
  207. """
  208. events = []
  209. self.event.addTrigger(
  210. 'before', lambda: self.event.removeTrigger(duringHandle))
  211. duringHandle = self.event.addTrigger('during', events.append, 'during')
  212. self.event.addTrigger('after', events.append, 'after')
  213. self.event.fireEvent()
  214. self.assertEqual(events, ['after'])
  215. def test_asynchronousBeforeRemovesDuring(self):
  216. """
  217. If a before-phase trigger returns a L{Deferred} and later removes a
  218. during-phase trigger before the L{Deferred} fires, the during-phase
  219. trigger should not be run.
  220. """
  221. events = []
  222. beforeResult = Deferred()
  223. self.event.addTrigger('before', lambda: beforeResult)
  224. duringHandle = self.event.addTrigger('during', events.append, 'during')
  225. self.event.addTrigger('after', events.append, 'after')
  226. self.event.fireEvent()
  227. self.event.removeTrigger(duringHandle)
  228. beforeResult.callback(None)
  229. self.assertEqual(events, ['after'])
  230. def test_synchronousBeforeRemovesConspicuouslySimilarDuring(self):
  231. """
  232. If a before-phase trigger removes a during-phase trigger which is
  233. identical to an already-executed before-phase trigger aside from their
  234. phases, no warning should be emitted and the during-phase trigger
  235. should not be run.
  236. """
  237. events = []
  238. def trigger():
  239. events.append('trigger')
  240. self.event.addTrigger('before', trigger)
  241. self.event.addTrigger(
  242. 'before', lambda: self.event.removeTrigger(duringTrigger))
  243. duringTrigger = self.event.addTrigger('during', trigger)
  244. self.event.fireEvent()
  245. self.assertEqual(events, ['trigger'])
  246. def test_synchronousRemovePendingDuring(self):
  247. """
  248. If a during-phase trigger removes another during-phase trigger which
  249. has not yet run, the removed trigger should not be run.
  250. """
  251. events = []
  252. self.event.addTrigger(
  253. 'during', lambda: self.event.removeTrigger(duringHandle))
  254. duringHandle = self.event.addTrigger(
  255. 'during', events.append, ('first', 'during'))
  256. self.event.addTrigger(
  257. 'during', events.append, ('second', 'during'))
  258. self.event.fireEvent()
  259. self.assertEqual(events, [('second', 'during')])
  260. def test_triggersRunOnce(self):
  261. """
  262. A trigger should only be called on the first call to
  263. L{_ThreePhaseEvent.fireEvent}.
  264. """
  265. events = []
  266. self.event.addTrigger('before', events.append, 'before')
  267. self.event.addTrigger('during', events.append, 'during')
  268. self.event.addTrigger('after', events.append, 'after')
  269. self.event.fireEvent()
  270. self.event.fireEvent()
  271. self.assertEqual(events, ['before', 'during', 'after'])
  272. def test_finishedBeforeTriggersCleared(self):
  273. """
  274. The temporary list L{_ThreePhaseEvent.finishedBefore} should be emptied
  275. and the state reset to C{'BASE'} before the first during-phase trigger
  276. executes.
  277. """
  278. events = []
  279. def duringTrigger():
  280. events.append('during')
  281. self.assertEqual(self.event.finishedBefore, [])
  282. self.assertEqual(self.event.state, 'BASE')
  283. self.event.addTrigger('before', events.append, 'before')
  284. self.event.addTrigger('during', duringTrigger)
  285. self.event.fireEvent()
  286. self.assertEqual(events, ['before', 'during'])
  287. class SystemEventTests(unittest.TestCase):
  288. """
  289. Tests for the reactor's implementation of the C{fireSystemEvent},
  290. C{addSystemEventTrigger}, and C{removeSystemEventTrigger} methods of the
  291. L{IReactorCore} interface.
  292. @ivar triggers: A list of the handles to triggers which have been added to
  293. the reactor.
  294. """
  295. def setUp(self):
  296. """
  297. Create an empty list in which to store trigger handles.
  298. """
  299. self.triggers = []
  300. def tearDown(self):
  301. """
  302. Remove all remaining triggers from the reactor.
  303. """
  304. while self.triggers:
  305. trigger = self.triggers.pop()
  306. try:
  307. reactor.removeSystemEventTrigger(trigger)
  308. except (ValueError, KeyError):
  309. pass
  310. def addTrigger(self, event, phase, func):
  311. """
  312. Add a trigger to the reactor and remember it in C{self.triggers}.
  313. """
  314. t = reactor.addSystemEventTrigger(event, phase, func)
  315. self.triggers.append(t)
  316. return t
  317. def removeTrigger(self, trigger):
  318. """
  319. Remove a trigger by its handle from the reactor and from
  320. C{self.triggers}.
  321. """
  322. reactor.removeSystemEventTrigger(trigger)
  323. self.triggers.remove(trigger)
  324. def _addSystemEventTriggerTest(self, phase):
  325. eventType = 'test'
  326. events = []
  327. def trigger():
  328. events.append(None)
  329. self.addTrigger(phase, eventType, trigger)
  330. self.assertEqual(events, [])
  331. reactor.fireSystemEvent(eventType)
  332. self.assertEqual(events, [None])
  333. def test_beforePhase(self):
  334. """
  335. L{IReactorCore.addSystemEventTrigger} should accept the C{'before'}
  336. phase and not call the given object until the right event is fired.
  337. """
  338. self._addSystemEventTriggerTest('before')
  339. def test_duringPhase(self):
  340. """
  341. L{IReactorCore.addSystemEventTrigger} should accept the C{'during'}
  342. phase and not call the given object until the right event is fired.
  343. """
  344. self._addSystemEventTriggerTest('during')
  345. def test_afterPhase(self):
  346. """
  347. L{IReactorCore.addSystemEventTrigger} should accept the C{'after'}
  348. phase and not call the given object until the right event is fired.
  349. """
  350. self._addSystemEventTriggerTest('after')
  351. def test_unknownPhase(self):
  352. """
  353. L{IReactorCore.addSystemEventTrigger} should reject phases other than
  354. C{'before'}, C{'during'}, or C{'after'}.
  355. """
  356. eventType = 'test'
  357. self.assertRaises(
  358. KeyError, self.addTrigger, 'xxx', eventType, lambda: None)
  359. def test_beforePreceedsDuring(self):
  360. """
  361. L{IReactorCore.addSystemEventTrigger} should call triggers added to the
  362. C{'before'} phase before it calls triggers added to the C{'during'}
  363. phase.
  364. """
  365. eventType = 'test'
  366. events = []
  367. def beforeTrigger():
  368. events.append('before')
  369. def duringTrigger():
  370. events.append('during')
  371. self.addTrigger('before', eventType, beforeTrigger)
  372. self.addTrigger('during', eventType, duringTrigger)
  373. self.assertEqual(events, [])
  374. reactor.fireSystemEvent(eventType)
  375. self.assertEqual(events, ['before', 'during'])
  376. def test_duringPreceedsAfter(self):
  377. """
  378. L{IReactorCore.addSystemEventTrigger} should call triggers added to the
  379. C{'during'} phase before it calls triggers added to the C{'after'}
  380. phase.
  381. """
  382. eventType = 'test'
  383. events = []
  384. def duringTrigger():
  385. events.append('during')
  386. def afterTrigger():
  387. events.append('after')
  388. self.addTrigger('during', eventType, duringTrigger)
  389. self.addTrigger('after', eventType, afterTrigger)
  390. self.assertEqual(events, [])
  391. reactor.fireSystemEvent(eventType)
  392. self.assertEqual(events, ['during', 'after'])
  393. def test_beforeReturnsDeferred(self):
  394. """
  395. If a trigger added to the C{'before'} phase of an event returns a
  396. L{Deferred}, the C{'during'} phase should be delayed until it is called
  397. back.
  398. """
  399. triggerDeferred = Deferred()
  400. eventType = 'test'
  401. events = []
  402. def beforeTrigger():
  403. return triggerDeferred
  404. def duringTrigger():
  405. events.append('during')
  406. self.addTrigger('before', eventType, beforeTrigger)
  407. self.addTrigger('during', eventType, duringTrigger)
  408. self.assertEqual(events, [])
  409. reactor.fireSystemEvent(eventType)
  410. self.assertEqual(events, [])
  411. triggerDeferred.callback(None)
  412. self.assertEqual(events, ['during'])
  413. def test_multipleBeforeReturnDeferred(self):
  414. """
  415. If more than one trigger added to the C{'before'} phase of an event
  416. return L{Deferred}s, the C{'during'} phase should be delayed until they
  417. are all called back.
  418. """
  419. firstDeferred = Deferred()
  420. secondDeferred = Deferred()
  421. eventType = 'test'
  422. events = []
  423. def firstBeforeTrigger():
  424. return firstDeferred
  425. def secondBeforeTrigger():
  426. return secondDeferred
  427. def duringTrigger():
  428. events.append('during')
  429. self.addTrigger('before', eventType, firstBeforeTrigger)
  430. self.addTrigger('before', eventType, secondBeforeTrigger)
  431. self.addTrigger('during', eventType, duringTrigger)
  432. self.assertEqual(events, [])
  433. reactor.fireSystemEvent(eventType)
  434. self.assertEqual(events, [])
  435. firstDeferred.callback(None)
  436. self.assertEqual(events, [])
  437. secondDeferred.callback(None)
  438. self.assertEqual(events, ['during'])
  439. def test_subsequentBeforeTriggerFiresPriorBeforeDeferred(self):
  440. """
  441. If a trigger added to the C{'before'} phase of an event calls back a
  442. L{Deferred} returned by an earlier trigger in the C{'before'} phase of
  443. the same event, the remaining C{'before'} triggers for that event
  444. should be run and any further L{Deferred}s waited on before proceeding
  445. to the C{'during'} events.
  446. """
  447. eventType = 'test'
  448. events = []
  449. firstDeferred = Deferred()
  450. secondDeferred = Deferred()
  451. def firstBeforeTrigger():
  452. return firstDeferred
  453. def secondBeforeTrigger():
  454. firstDeferred.callback(None)
  455. def thirdBeforeTrigger():
  456. events.append('before')
  457. return secondDeferred
  458. def duringTrigger():
  459. events.append('during')
  460. self.addTrigger('before', eventType, firstBeforeTrigger)
  461. self.addTrigger('before', eventType, secondBeforeTrigger)
  462. self.addTrigger('before', eventType, thirdBeforeTrigger)
  463. self.addTrigger('during', eventType, duringTrigger)
  464. self.assertEqual(events, [])
  465. reactor.fireSystemEvent(eventType)
  466. self.assertEqual(events, ['before'])
  467. secondDeferred.callback(None)
  468. self.assertEqual(events, ['before', 'during'])
  469. def test_removeSystemEventTrigger(self):
  470. """
  471. A trigger removed with L{IReactorCore.removeSystemEventTrigger} should
  472. not be called when the event fires.
  473. """
  474. eventType = 'test'
  475. events = []
  476. def firstBeforeTrigger():
  477. events.append('first')
  478. def secondBeforeTrigger():
  479. events.append('second')
  480. self.addTrigger('before', eventType, firstBeforeTrigger)
  481. self.removeTrigger(
  482. self.addTrigger('before', eventType, secondBeforeTrigger))
  483. self.assertEqual(events, [])
  484. reactor.fireSystemEvent(eventType)
  485. self.assertEqual(events, ['first'])
  486. def test_removeNonExistentSystemEventTrigger(self):
  487. """
  488. Passing an object to L{IReactorCore.removeSystemEventTrigger} which was
  489. not returned by a previous call to
  490. L{IReactorCore.addSystemEventTrigger} or which has already been passed
  491. to C{removeSystemEventTrigger} should result in L{TypeError},
  492. L{KeyError}, or L{ValueError} being raised.
  493. """
  494. b = self.addTrigger('during', 'test', lambda: None)
  495. self.removeTrigger(b)
  496. self.assertRaises(
  497. TypeError, reactor.removeSystemEventTrigger, None)
  498. self.assertRaises(
  499. ValueError, reactor.removeSystemEventTrigger, b)
  500. self.assertRaises(
  501. KeyError,
  502. reactor.removeSystemEventTrigger,
  503. (b[0], ('xxx',) + b[1][1:]))
  504. def test_interactionBetweenDifferentEvents(self):
  505. """
  506. L{IReactorCore.fireSystemEvent} should behave the same way for a
  507. particular system event regardless of whether Deferreds are being
  508. waited on for a different system event.
  509. """
  510. events = []
  511. firstEvent = 'first-event'
  512. firstDeferred = Deferred()
  513. def beforeFirstEvent():
  514. events.append(('before', 'first'))
  515. return firstDeferred
  516. def afterFirstEvent():
  517. events.append(('after', 'first'))
  518. secondEvent = 'second-event'
  519. secondDeferred = Deferred()
  520. def beforeSecondEvent():
  521. events.append(('before', 'second'))
  522. return secondDeferred
  523. def afterSecondEvent():
  524. events.append(('after', 'second'))
  525. self.addTrigger('before', firstEvent, beforeFirstEvent)
  526. self.addTrigger('after', firstEvent, afterFirstEvent)
  527. self.addTrigger('before', secondEvent, beforeSecondEvent)
  528. self.addTrigger('after', secondEvent, afterSecondEvent)
  529. self.assertEqual(events, [])
  530. # After this, firstEvent should be stuck before 'during' waiting for
  531. # firstDeferred.
  532. reactor.fireSystemEvent(firstEvent)
  533. self.assertEqual(events, [('before', 'first')])
  534. # After this, secondEvent should be stuck before 'during' waiting for
  535. # secondDeferred.
  536. reactor.fireSystemEvent(secondEvent)
  537. self.assertEqual(events, [('before', 'first'), ('before', 'second')])
  538. # After this, firstEvent should have finished completely, but
  539. # secondEvent should be at the same place.
  540. firstDeferred.callback(None)
  541. self.assertEqual(events, [('before', 'first'), ('before', 'second'),
  542. ('after', 'first')])
  543. # After this, secondEvent should have finished completely.
  544. secondDeferred.callback(None)
  545. self.assertEqual(events, [('before', 'first'), ('before', 'second'),
  546. ('after', 'first'), ('after', 'second')])
  547. class TimeTests(unittest.TestCase):
  548. """
  549. Tests for the IReactorTime part of the reactor.
  550. """
  551. def test_seconds(self):
  552. """
  553. L{twisted.internet.reactor.seconds} should return something
  554. like a number.
  555. 1. This test specifically does not assert any relation to the
  556. "system time" as returned by L{time.time} or
  557. L{twisted.python.runtime.seconds}, because at some point we
  558. may find a better option for scheduling calls than
  559. wallclock-time.
  560. 2. This test *also* does not assert anything about the type of
  561. the result, because operations may not return ints or
  562. floats: For example, datetime-datetime == timedelta(0).
  563. """
  564. now = reactor.seconds()
  565. self.assertEqual(now-now+now, now)
  566. def test_callLaterUsesReactorSecondsInDelayedCall(self):
  567. """
  568. L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>}
  569. should use the reactor's seconds factory
  570. to produce the time at which the DelayedCall will be called.
  571. """
  572. oseconds = reactor.seconds
  573. reactor.seconds = lambda: 100
  574. try:
  575. call = reactor.callLater(5, lambda: None)
  576. self.assertEqual(call.getTime(), 105)
  577. finally:
  578. reactor.seconds = oseconds
  579. call.cancel()
  580. def test_callLaterUsesReactorSecondsAsDelayedCallSecondsFactory(self):
  581. """
  582. L{reactor.callLater<twisted.internet.interfaces.IReactorTime.callLater>}
  583. should propagate its own seconds factory
  584. to the DelayedCall to use as its own seconds factory.
  585. """
  586. oseconds = reactor.seconds
  587. reactor.seconds = lambda: 100
  588. try:
  589. call = reactor.callLater(5, lambda: None)
  590. self.assertEqual(call.seconds(), 100)
  591. finally:
  592. reactor.seconds = oseconds
  593. call.cancel()
  594. def test_callLater(self):
  595. """
  596. Test that a DelayedCall really calls the function it is
  597. supposed to call.
  598. """
  599. d = Deferred()
  600. reactor.callLater(0, d.callback, None)
  601. d.addCallback(self.assertEqual, None)
  602. return d
  603. def test_callLaterReset(self):
  604. """
  605. A L{DelayedCall} that is reset will be scheduled at the new time.
  606. """
  607. call = reactor.callLater(2, passthru, passthru)
  608. self.addCleanup(call.cancel)
  609. origTime = call.time
  610. call.reset(1)
  611. self.assertNotEqual(call.time, origTime)
  612. def test_cancelDelayedCall(self):
  613. """
  614. Test that when a DelayedCall is cancelled it does not run.
  615. """
  616. called = []
  617. def function():
  618. called.append(None)
  619. call = reactor.callLater(0, function)
  620. call.cancel()
  621. # Schedule a call in two "iterations" to check to make sure that the
  622. # above call never ran.
  623. d = Deferred()
  624. def check():
  625. try:
  626. self.assertEqual(called, [])
  627. except:
  628. d.errback()
  629. else:
  630. d.callback(None)
  631. reactor.callLater(0, reactor.callLater, 0, check)
  632. return d
  633. def test_cancelCancelledDelayedCall(self):
  634. """
  635. Test that cancelling a DelayedCall which has already been cancelled
  636. raises the appropriate exception.
  637. """
  638. call = reactor.callLater(0, lambda: None)
  639. call.cancel()
  640. self.assertRaises(error.AlreadyCancelled, call.cancel)
  641. def test_cancelCalledDelayedCallSynchronous(self):
  642. """
  643. Test that cancelling a DelayedCall in the DelayedCall's function as
  644. that function is being invoked by the DelayedCall raises the
  645. appropriate exception.
  646. """
  647. d = Deferred()
  648. def later():
  649. try:
  650. self.assertRaises(error.AlreadyCalled, call.cancel)
  651. except:
  652. d.errback()
  653. else:
  654. d.callback(None)
  655. call = reactor.callLater(0, later)
  656. return d
  657. def test_cancelCalledDelayedCallAsynchronous(self):
  658. """
  659. Test that cancelling a DelayedCall after it has run its function
  660. raises the appropriate exception.
  661. """
  662. d = Deferred()
  663. def check():
  664. try:
  665. self.assertRaises(error.AlreadyCalled, call.cancel)
  666. except:
  667. d.errback()
  668. else:
  669. d.callback(None)
  670. def later():
  671. reactor.callLater(0, check)
  672. call = reactor.callLater(0, later)
  673. return d
  674. def testCallLaterTime(self):
  675. d = reactor.callLater(10, lambda: None)
  676. try:
  677. self.assertTrue(d.getTime() - (time.time() + 10) < 1)
  678. finally:
  679. d.cancel()
  680. def testDelayedCallStringification(self):
  681. # Mostly just make sure str() isn't going to raise anything for
  682. # DelayedCalls within reason.
  683. dc = reactor.callLater(0, lambda x, y: None, 'x', y=10)
  684. str(dc)
  685. dc.reset(5)
  686. str(dc)
  687. dc.cancel()
  688. str(dc)
  689. dc = reactor.callLater(0, lambda: None, x=[({'hello': u'world'}, 10j), reactor], *range(10))
  690. str(dc)
  691. dc.cancel()
  692. str(dc)
  693. def calledBack(ignored):
  694. str(dc)
  695. d = Deferred().addCallback(calledBack)
  696. dc = reactor.callLater(0, d.callback, None)
  697. str(dc)
  698. return d
  699. def testDelayedCallSecondsOverride(self):
  700. """
  701. Test that the C{seconds} argument to DelayedCall gets used instead of
  702. the default timing function, if it is not None.
  703. """
  704. def seconds():
  705. return 10
  706. dc = base.DelayedCall(5, lambda: None, (), {}, lambda dc: None,
  707. lambda dc: None, seconds)
  708. self.assertEqual(dc.getTime(), 5)
  709. dc.reset(3)
  710. self.assertEqual(dc.getTime(), 13)
  711. class CallFromThreadStopsAndWakeUpTests(unittest.TestCase):
  712. def testWakeUp(self):
  713. # Make sure other threads can wake up the reactor
  714. d = Deferred()
  715. def wake():
  716. time.sleep(0.1)
  717. # callFromThread will call wakeUp for us
  718. reactor.callFromThread(d.callback, None)
  719. reactor.callInThread(wake)
  720. return d
  721. if interfaces.IReactorThreads(reactor, None) is None:
  722. testWakeUp.skip = "Nothing to wake up for without thread support"
  723. def _stopCallFromThreadCallback(self):
  724. self.stopped = True
  725. def _callFromThreadCallback(self, d):
  726. reactor.callFromThread(self._callFromThreadCallback2, d)
  727. reactor.callLater(0, self._stopCallFromThreadCallback)
  728. def _callFromThreadCallback2(self, d):
  729. try:
  730. self.assertTrue(self.stopped)
  731. except:
  732. # Send the error to the deferred
  733. d.errback()
  734. else:
  735. d.callback(None)
  736. def testCallFromThreadStops(self):
  737. """
  738. Ensure that callFromThread from inside a callFromThread
  739. callback doesn't sit in an infinite loop and lets other
  740. things happen too.
  741. """
  742. self.stopped = False
  743. d = defer.Deferred()
  744. reactor.callFromThread(self._callFromThreadCallback, d)
  745. return d
  746. class DelayedTests(unittest.TestCase):
  747. def setUp(self):
  748. self.finished = 0
  749. self.counter = 0
  750. self.timers = {}
  751. self.deferred = defer.Deferred()
  752. def tearDown(self):
  753. for t in self.timers.values():
  754. t.cancel()
  755. def checkTimers(self):
  756. l1 = self.timers.values()
  757. l2 = list(reactor.getDelayedCalls())
  758. # There should be at least the calls we put in. There may be other
  759. # calls that are none of our business and that we should ignore,
  760. # though.
  761. missing = []
  762. for dc in l1:
  763. if dc not in l2:
  764. missing.append(dc)
  765. if missing:
  766. self.finished = 1
  767. self.assertFalse(missing, "Should have been missing no calls, instead "
  768. + "was missing " + repr(missing))
  769. def callback(self, tag):
  770. del self.timers[tag]
  771. self.checkTimers()
  772. def addCallback(self, tag):
  773. self.callback(tag)
  774. self.addTimer(15, self.callback)
  775. def done(self, tag):
  776. self.finished = 1
  777. self.callback(tag)
  778. self.deferred.callback(None)
  779. def addTimer(self, when, callback):
  780. self.timers[self.counter] = reactor.callLater(when * 0.01, callback,
  781. self.counter)
  782. self.counter += 1
  783. self.checkTimers()
  784. def testGetDelayedCalls(self):
  785. if not hasattr(reactor, "getDelayedCalls"):
  786. return
  787. # This is not a race because we don't do anything which might call
  788. # the reactor until we have all the timers set up. If we did, this
  789. # test might fail on slow systems.
  790. self.checkTimers()
  791. self.addTimer(35, self.done)
  792. self.addTimer(20, self.callback)
  793. self.addTimer(30, self.callback)
  794. which = self.counter
  795. self.addTimer(29, self.callback)
  796. self.addTimer(25, self.addCallback)
  797. self.addTimer(26, self.callback)
  798. self.timers[which].cancel()
  799. del self.timers[which]
  800. self.checkTimers()
  801. self.deferred.addCallback(lambda x : self.checkTimers())
  802. return self.deferred
  803. def test_active(self):
  804. """
  805. L{IDelayedCall.active} returns False once the call has run.
  806. """
  807. dcall = reactor.callLater(0.01, self.deferred.callback, True)
  808. self.assertTrue(dcall.active())
  809. def checkDeferredCall(success):
  810. self.assertFalse(dcall.active())
  811. return success
  812. self.deferred.addCallback(checkDeferredCall)
  813. return self.deferred
  814. resolve_helper = """
  815. from __future__ import print_function
  816. import %(reactor)s
  817. %(reactor)s.install()
  818. from twisted.internet import reactor
  819. class Foo:
  820. def __init__(self):
  821. reactor.callWhenRunning(self.start)
  822. self.timer = reactor.callLater(3, self.failed)
  823. def start(self):
  824. reactor.resolve('localhost').addBoth(self.done)
  825. def done(self, res):
  826. print('done', res)
  827. reactor.stop()
  828. def failed(self):
  829. print('failed')
  830. self.timer = None
  831. reactor.stop()
  832. f = Foo()
  833. reactor.run()
  834. """
  835. class ChildResolveProtocol(protocol.ProcessProtocol):
  836. def __init__(self, onCompletion):
  837. self.onCompletion = onCompletion
  838. def connectionMade(self):
  839. self.output = []
  840. self.error = []
  841. def outReceived(self, out):
  842. self.output.append(out)
  843. def errReceived(self, err):
  844. self.error.append(err)
  845. def processEnded(self, reason):
  846. self.onCompletion.callback((reason, self.output, self.error))
  847. self.onCompletion = None
  848. class ResolveTests(unittest.TestCase):
  849. def testChildResolve(self):
  850. # I've seen problems with reactor.run under gtk2reactor. Spawn a
  851. # child which just does reactor.resolve after the reactor has
  852. # started, fail if it does not complete in a timely fashion.
  853. helperPath = os.path.abspath(self.mktemp())
  854. with open(helperPath, 'w') as helperFile:
  855. # Eeueuuggg
  856. reactorName = reactor.__module__
  857. helperFile.write(resolve_helper % {'reactor': reactorName})
  858. env = os.environ.copy()
  859. env['PYTHONPATH'] = os.pathsep.join(sys.path)
  860. helperDeferred = Deferred()
  861. helperProto = ChildResolveProtocol(helperDeferred)
  862. reactor.spawnProcess(helperProto, sys.executable, ("python", "-u", helperPath), env)
  863. def cbFinished(result):
  864. (reason, output, error) = result
  865. # If the output is "done 127.0.0.1\n" we don't really care what
  866. # else happened.
  867. output = b''.join(output)
  868. if _PY3:
  869. expected_output = (b'done 127.0.0.1' +
  870. os.linesep.encode("ascii"))
  871. else:
  872. expected_output = b'done 127.0.0.1\n'
  873. if output != expected_output:
  874. self.fail((
  875. "The child process failed to produce the desired results:\n"
  876. " Reason for termination was: %r\n"
  877. " Output stream was: %r\n"
  878. " Error stream was: %r\n") % (reason.getErrorMessage(), output, b''.join(error)))
  879. helperDeferred.addCallback(cbFinished)
  880. return helperDeferred
  881. if not interfaces.IReactorProcess(reactor, None):
  882. ResolveTests.skip = (
  883. "cannot run test: reactor doesn't support IReactorProcess")
  884. class CallFromThreadTests(unittest.TestCase):
  885. """
  886. Task scheduling from threads tests.
  887. """
  888. if interfaces.IReactorThreads(reactor, None) is None:
  889. skip = "Nothing to test without thread support"
  890. def setUp(self):
  891. self.counter = 0
  892. self.deferred = Deferred()
  893. def schedule(self, *args, **kwargs):
  894. """
  895. Override in subclasses.
  896. """
  897. reactor.callFromThread(*args, **kwargs)
  898. def test_lotsOfThreadsAreScheduledCorrectly(self):
  899. """
  900. L{IReactorThreads.callFromThread} can be used to schedule a large
  901. number of calls in the reactor thread.
  902. """
  903. def addAndMaybeFinish():
  904. self.counter += 1
  905. if self.counter == 100:
  906. self.deferred.callback(True)
  907. for i in range(100):
  908. self.schedule(addAndMaybeFinish)
  909. return self.deferred
  910. def test_threadsAreRunInScheduledOrder(self):
  911. """
  912. Callbacks should be invoked in the order they were scheduled.
  913. """
  914. order = []
  915. def check(_):
  916. self.assertEqual(order, [1, 2, 3])
  917. self.deferred.addCallback(check)
  918. self.schedule(order.append, 1)
  919. self.schedule(order.append, 2)
  920. self.schedule(order.append, 3)
  921. self.schedule(reactor.callFromThread, self.deferred.callback, None)
  922. return self.deferred
  923. def test_scheduledThreadsNotRunUntilReactorRuns(self):
  924. """
  925. Scheduled tasks should not be run until the reactor starts running.
  926. """
  927. def incAndFinish():
  928. self.counter = 1
  929. self.deferred.callback(True)
  930. self.schedule(incAndFinish)
  931. # Callback shouldn't have fired yet.
  932. self.assertEqual(self.counter, 0)
  933. return self.deferred
  934. class MyProtocol(protocol.Protocol):
  935. """
  936. Sample protocol.
  937. """
  938. class MyFactory(protocol.Factory):
  939. """
  940. Sample factory.
  941. """
  942. protocol = MyProtocol
  943. class ProtocolTests(unittest.TestCase):
  944. def testFactory(self):
  945. factory = MyFactory()
  946. protocol = factory.buildProtocol(None)
  947. self.assertEqual(protocol.factory, factory)
  948. self.assertIsInstance(protocol, factory.protocol)
  949. class DummyProducer(object):
  950. """
  951. Very uninteresting producer implementation used by tests to ensure the
  952. right methods are called by the consumer with which it is registered.
  953. @type events: C{list} of C{str}
  954. @ivar events: The producer/consumer related events which have happened to
  955. this producer. Strings in this list may be C{'resume'}, C{'stop'}, or
  956. C{'pause'}. Elements are added as they occur.
  957. """
  958. def __init__(self):
  959. self.events = []
  960. def resumeProducing(self):
  961. self.events.append('resume')
  962. def stopProducing(self):
  963. self.events.append('stop')
  964. def pauseProducing(self):
  965. self.events.append('pause')
  966. class SillyDescriptor(abstract.FileDescriptor):
  967. """
  968. A descriptor whose data buffer gets filled very fast.
  969. Useful for testing FileDescriptor's IConsumer interface, since
  970. the data buffer fills as soon as at least four characters are
  971. written to it, and gets emptied in a single doWrite() cycle.
  972. """
  973. bufferSize = 3
  974. connected = True
  975. def writeSomeData(self, data):
  976. """
  977. Always write all data.
  978. """
  979. return len(data)
  980. def startWriting(self):
  981. """
  982. Do nothing: bypass the reactor.
  983. """
  984. stopWriting = startWriting
  985. class ReentrantProducer(DummyProducer):
  986. """
  987. Similar to L{DummyProducer}, but with a resumeProducing method which calls
  988. back into an L{IConsumer} method of the consumer against which it is
  989. registered.
  990. @ivar consumer: The consumer with which this producer has been or will
  991. be registered.
  992. @ivar methodName: The name of the method to call on the consumer inside
  993. C{resumeProducing}.
  994. @ivar methodArgs: The arguments to pass to the consumer method invoked in
  995. C{resumeProducing}.
  996. """
  997. def __init__(self, consumer, methodName, *methodArgs):
  998. super(ReentrantProducer, self).__init__()
  999. self.consumer = consumer
  1000. self.methodName = methodName
  1001. self.methodArgs = methodArgs
  1002. def resumeProducing(self):
  1003. super(ReentrantProducer, self).resumeProducing()
  1004. getattr(self.consumer, self.methodName)(*self.methodArgs)
  1005. class ProducerTests(unittest.TestCase):
  1006. """
  1007. Test abstract.FileDescriptor's consumer interface.
  1008. """
  1009. def test_doubleProducer(self):
  1010. """
  1011. Verify that registering a non-streaming producer invokes its
  1012. resumeProducing() method and that you can only register one producer
  1013. at a time.
  1014. """
  1015. fd = abstract.FileDescriptor()
  1016. fd.connected = 1
  1017. dp = DummyProducer()
  1018. fd.registerProducer(dp, 0)
  1019. self.assertEqual(dp.events, ['resume'])
  1020. self.assertRaises(RuntimeError, fd.registerProducer, DummyProducer(), 0)
  1021. def test_unconnectedFileDescriptor(self):
  1022. """
  1023. Verify that registering a producer when the connection has already
  1024. been closed invokes its stopProducing() method.
  1025. """
  1026. fd = abstract.FileDescriptor()
  1027. fd.disconnected = 1
  1028. dp = DummyProducer()
  1029. fd.registerProducer(dp, 0)
  1030. self.assertEqual(dp.events, ['stop'])
  1031. def _dontPausePullConsumerTest(self, methodName):
  1032. """
  1033. Pull consumers don't get their C{pauseProducing} method called if the
  1034. descriptor buffer fills up.
  1035. @param _methodName: Either 'write', or 'writeSequence', indicating
  1036. which transport method to write data to.
  1037. """
  1038. descriptor = SillyDescriptor()
  1039. producer = DummyProducer()
  1040. descriptor.registerProducer(producer, streaming=False)
  1041. self.assertEqual(producer.events, ['resume'])
  1042. del producer.events[:]
  1043. # Fill up the descriptor's write buffer so we can observe whether or
  1044. # not it pauses its producer in that case.
  1045. if methodName == "writeSequence":
  1046. descriptor.writeSequence([b'1', b'2', b'3', b'4'])
  1047. else:
  1048. descriptor.write(b'1234')
  1049. self.assertEqual(producer.events, [])
  1050. def test_dontPausePullConsumerOnWrite(self):
  1051. """
  1052. Verify that FileDescriptor does not call producer.pauseProducing() on a
  1053. non-streaming pull producer in response to a L{IConsumer.write} call
  1054. which results in a full write buffer. Issue #2286.
  1055. """
  1056. return self._dontPausePullConsumerTest('write')
  1057. def test_dontPausePullConsumerOnWriteSequence(self):
  1058. """
  1059. Like L{test_dontPausePullConsumerOnWrite}, but for a call to
  1060. C{writeSequence} rather than L{IConsumer.write}.
  1061. C{writeSequence} is not part of L{IConsumer}, but
  1062. L{abstract.FileDescriptor} has supported consumery behavior in response
  1063. to calls to L{writeSequence} forever.
  1064. """
  1065. return self._dontPausePullConsumerTest('writeSequence')
  1066. def _reentrantStreamingProducerTest(self, methodName):
  1067. descriptor = SillyDescriptor()
  1068. if methodName == "writeSequence":
  1069. data = [b's', b'p', b'am']
  1070. else:
  1071. data = b"spam"
  1072. producer = ReentrantProducer(descriptor, methodName, data)
  1073. descriptor.registerProducer(producer, streaming=True)
  1074. # Start things off by filling up the descriptor's buffer so it will
  1075. # pause its producer.
  1076. getattr(descriptor, methodName)(data)
  1077. # Sanity check - make sure that worked.
  1078. self.assertEqual(producer.events, ['pause'])
  1079. del producer.events[:]
  1080. # After one call to doWrite, the buffer has been emptied so the
  1081. # FileDescriptor should resume its producer. That will result in an
  1082. # immediate call to FileDescriptor.write which will again fill the
  1083. # buffer and result in the producer being paused.
  1084. descriptor.doWrite()
  1085. self.assertEqual(producer.events, ['resume', 'pause'])
  1086. del producer.events[:]
  1087. # After a second call to doWrite, the exact same thing should have
  1088. # happened. Prior to the bugfix for which this test was written,
  1089. # FileDescriptor would have incorrectly believed its producer was
  1090. # already resumed (it was paused) and so not resume it again.
  1091. descriptor.doWrite()
  1092. self.assertEqual(producer.events, ['resume', 'pause'])
  1093. def test_reentrantStreamingProducerUsingWrite(self):
  1094. """
  1095. Verify that FileDescriptor tracks producer's paused state correctly.
  1096. Issue #811, fixed in revision r12857.
  1097. """
  1098. return self._reentrantStreamingProducerTest('write')
  1099. def test_reentrantStreamingProducerUsingWriteSequence(self):
  1100. """
  1101. Like L{test_reentrantStreamingProducerUsingWrite}, but for calls to
  1102. C{writeSequence}.
  1103. C{writeSequence} is B{not} part of L{IConsumer}, however
  1104. C{abstract.FileDescriptor} has supported consumery behavior in response
  1105. to calls to C{writeSequence} forever.
  1106. """
  1107. return self._reentrantStreamingProducerTest('writeSequence')
  1108. class PortStringificationTests(unittest.TestCase):
  1109. if interfaces.IReactorTCP(reactor, None) is not None:
  1110. def testTCP(self):
  1111. p = reactor.listenTCP(0, protocol.ServerFactory())
  1112. portNo = p.getHost().port
  1113. self.assertNotEqual(str(p).find(str(portNo)), -1,
  1114. "%d not found in %s" % (portNo, p))
  1115. return p.stopListening()
  1116. if interfaces.IReactorUDP(reactor, None) is not None:
  1117. def testUDP(self):
  1118. p = reactor.listenUDP(0, protocol.DatagramProtocol())
  1119. portNo = p.getHost().port
  1120. self.assertNotEqual(str(p).find(str(portNo)), -1,
  1121. "%d not found in %s" % (portNo, p))
  1122. return p.stopListening()
  1123. if interfaces.IReactorSSL(reactor, None) is not None and ssl:
  1124. def testSSL(self, ssl=ssl):
  1125. pem = util.sibpath(__file__, 'server.pem')
  1126. p = reactor.listenSSL(0, protocol.ServerFactory(), ssl.DefaultOpenSSLContextFactory(pem, pem))
  1127. portNo = p.getHost().port
  1128. self.assertNotEqual(str(p).find(str(portNo)), -1,
  1129. "%d not found in %s" % (portNo, p))
  1130. return p.stopListening()
  1131. if _PY3:
  1132. testSSL.skip = ("Re-enable once the Python 3 SSL port is done.")