test_log.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test the interaction between trial and errors logged during test run.
  5. """
  6. from __future__ import division, absolute_import
  7. import time
  8. from twisted.internet import reactor, task
  9. from twisted.python import failure, log
  10. from twisted.trial import unittest, reporter, _synctest
  11. def makeFailure():
  12. """
  13. Return a new, realistic failure.
  14. """
  15. try:
  16. 1/0
  17. except ZeroDivisionError:
  18. f = failure.Failure()
  19. return f
  20. class Mask(object):
  21. """
  22. Hide C{MockTest}s from Trial's automatic test finder.
  23. """
  24. class FailureLoggingMixin(object):
  25. def test_silent(self):
  26. """
  27. Don't log any errors.
  28. """
  29. def test_single(self):
  30. """
  31. Log a single error.
  32. """
  33. log.err(makeFailure())
  34. def test_double(self):
  35. """
  36. Log two errors.
  37. """
  38. log.err(makeFailure())
  39. log.err(makeFailure())
  40. class SynchronousFailureLogging(FailureLoggingMixin, unittest.SynchronousTestCase):
  41. pass
  42. class AsynchronousFailureLogging(FailureLoggingMixin, unittest.TestCase):
  43. def test_inCallback(self):
  44. """
  45. Log an error in an asynchronous callback.
  46. """
  47. return task.deferLater(reactor, 0, lambda: log.err(makeFailure()))
  48. class ObserverTests(unittest.SynchronousTestCase):
  49. """
  50. Tests for L{_synctest._LogObserver}, a helper for the implementation of
  51. L{SynchronousTestCase.flushLoggedErrors}.
  52. """
  53. def setUp(self):
  54. self.result = reporter.TestResult()
  55. self.observer = _synctest._LogObserver()
  56. def test_msg(self):
  57. """
  58. Test that a standard log message doesn't go anywhere near the result.
  59. """
  60. self.observer.gotEvent({'message': ('some message',),
  61. 'time': time.time(), 'isError': 0,
  62. 'system': '-'})
  63. self.assertEqual(self.observer.getErrors(), [])
  64. def test_error(self):
  65. """
  66. Test that an observed error gets added to the result
  67. """
  68. f = makeFailure()
  69. self.observer.gotEvent({'message': (),
  70. 'time': time.time(), 'isError': 1,
  71. 'system': '-', 'failure': f,
  72. 'why': None})
  73. self.assertEqual(self.observer.getErrors(), [f])
  74. def test_flush(self):
  75. """
  76. Check that flushing the observer with no args removes all errors.
  77. """
  78. self.test_error()
  79. flushed = self.observer.flushErrors()
  80. self.assertEqual(self.observer.getErrors(), [])
  81. self.assertEqual(len(flushed), 1)
  82. self.assertTrue(flushed[0].check(ZeroDivisionError))
  83. def _makeRuntimeFailure(self):
  84. return failure.Failure(RuntimeError('test error'))
  85. def test_flushByType(self):
  86. """
  87. Check that flushing the observer remove all failures of the given type.
  88. """
  89. self.test_error() # log a ZeroDivisionError to the observer
  90. f = self._makeRuntimeFailure()
  91. self.observer.gotEvent(dict(message=(), time=time.time(), isError=1,
  92. system='-', failure=f, why=None))
  93. flushed = self.observer.flushErrors(ZeroDivisionError)
  94. self.assertEqual(self.observer.getErrors(), [f])
  95. self.assertEqual(len(flushed), 1)
  96. self.assertTrue(flushed[0].check(ZeroDivisionError))
  97. def test_ignoreErrors(self):
  98. """
  99. Check that C{_ignoreErrors} actually causes errors to be ignored.
  100. """
  101. self.observer._ignoreErrors(ZeroDivisionError)
  102. f = makeFailure()
  103. self.observer.gotEvent({'message': (),
  104. 'time': time.time(), 'isError': 1,
  105. 'system': '-', 'failure': f,
  106. 'why': None})
  107. self.assertEqual(self.observer.getErrors(), [])
  108. def test_clearIgnores(self):
  109. """
  110. Check that C{_clearIgnores} ensures that previously ignored errors
  111. get captured.
  112. """
  113. self.observer._ignoreErrors(ZeroDivisionError)
  114. self.observer._clearIgnores()
  115. f = makeFailure()
  116. self.observer.gotEvent({'message': (),
  117. 'time': time.time(), 'isError': 1,
  118. 'system': '-', 'failure': f,
  119. 'why': None})
  120. self.assertEqual(self.observer.getErrors(), [f])
  121. class LogErrorsMixin(object):
  122. """
  123. High-level tests demonstrating the expected behaviour of logged errors
  124. during tests.
  125. """
  126. def setUp(self):
  127. self.result = reporter.TestResult()
  128. def tearDown(self):
  129. self.flushLoggedErrors(ZeroDivisionError)
  130. def test_singleError(self):
  131. """
  132. Test that a logged error gets reported as a test error.
  133. """
  134. test = self.MockTest('test_single')
  135. test(self.result)
  136. self.assertEqual(len(self.result.errors), 1)
  137. self.assertTrue(self.result.errors[0][1].check(ZeroDivisionError),
  138. self.result.errors[0][1])
  139. self.assertEqual(0, self.result.successes)
  140. def test_twoErrors(self):
  141. """
  142. Test that when two errors get logged, they both get reported as test
  143. errors.
  144. """
  145. test = self.MockTest('test_double')
  146. test(self.result)
  147. self.assertEqual(len(self.result.errors), 2)
  148. self.assertEqual(0, self.result.successes)
  149. def test_errorsIsolated(self):
  150. """
  151. Check that an error logged in one test doesn't fail the next test.
  152. """
  153. t1 = self.MockTest('test_single')
  154. t2 = self.MockTest('test_silent')
  155. t1(self.result)
  156. t2(self.result)
  157. self.assertEqual(len(self.result.errors), 1)
  158. self.assertEqual(self.result.errors[0][0], t1)
  159. self.assertEqual(1, self.result.successes)
  160. def test_boundedObservers(self):
  161. """
  162. There are no extra log observers after a test runs.
  163. """
  164. # XXX trial is *all about* global log state. It should really be fixed.
  165. observer = _synctest._LogObserver()
  166. self.patch(_synctest, '_logObserver', observer)
  167. observers = log.theLogPublisher.observers[:]
  168. test = self.MockTest()
  169. test(self.result)
  170. self.assertEqual(observers, log.theLogPublisher.observers)
  171. class SynchronousLogErrorsTests(LogErrorsMixin, unittest.SynchronousTestCase):
  172. MockTest = Mask.SynchronousFailureLogging
  173. class AsynchronousLogErrorsTests(LogErrorsMixin, unittest.TestCase):
  174. MockTest = Mask.AsynchronousFailureLogging
  175. def test_inCallback(self):
  176. """
  177. Test that errors logged in callbacks get reported as test errors.
  178. """
  179. test = self.MockTest('test_inCallback')
  180. test(self.result)
  181. self.assertEqual(len(self.result.errors), 1)
  182. self.assertTrue(self.result.errors[0][1].check(ZeroDivisionError),
  183. self.result.errors[0][1])