test_file.py 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test cases for L{twisted.logger._file}.
  5. """
  6. from io import StringIO
  7. from zope.interface.verify import verifyObject, BrokenMethodImplementation
  8. from twisted.trial.unittest import TestCase
  9. from twisted.python.failure import Failure
  10. from twisted.python.compat import unicode
  11. from .._observer import ILogObserver
  12. from .._file import FileLogObserver
  13. from .._file import textFileLogObserver
  14. class FileLogObserverTests(TestCase):
  15. """
  16. Tests for L{FileLogObserver}.
  17. """
  18. def test_interface(self):
  19. """
  20. L{FileLogObserver} is an L{ILogObserver}.
  21. """
  22. with StringIO() as fileHandle:
  23. observer = FileLogObserver(fileHandle, lambda e: unicode(e))
  24. try:
  25. verifyObject(ILogObserver, observer)
  26. except BrokenMethodImplementation as e:
  27. self.fail(e)
  28. def test_observeWrites(self):
  29. """
  30. L{FileLogObserver} writes to the given file when it observes events.
  31. """
  32. with StringIO() as fileHandle:
  33. observer = FileLogObserver(fileHandle, lambda e: unicode(e))
  34. event = dict(x=1)
  35. observer(event)
  36. self.assertEqual(fileHandle.getvalue(), unicode(event))
  37. def _test_observeWrites(self, what, count):
  38. """
  39. Verify that observer performs an expected number of writes when the
  40. formatter returns a given value.
  41. @param what: the value for the formatter to return.
  42. @type what: L{unicode}
  43. @param count: the expected number of writes.
  44. @type count: L{int}
  45. """
  46. with DummyFile() as fileHandle:
  47. observer = FileLogObserver(fileHandle, lambda e: what)
  48. event = dict(x=1)
  49. observer(event)
  50. self.assertEqual(fileHandle.writes, count)
  51. def test_observeWritesNone(self):
  52. """
  53. L{FileLogObserver} does not write to the given file when it observes
  54. events and C{formatEvent} returns L{None}.
  55. """
  56. self._test_observeWrites(None, 0)
  57. def test_observeWritesEmpty(self):
  58. """
  59. L{FileLogObserver} does not write to the given file when it observes
  60. events and C{formatEvent} returns C{u""}.
  61. """
  62. self._test_observeWrites(u"", 0)
  63. def test_observeFlushes(self):
  64. """
  65. L{FileLogObserver} calles C{flush()} on the output file when it
  66. observes an event.
  67. """
  68. with DummyFile() as fileHandle:
  69. observer = FileLogObserver(fileHandle, lambda e: unicode(e))
  70. event = dict(x=1)
  71. observer(event)
  72. self.assertEqual(fileHandle.flushes, 1)
  73. class TextFileLogObserverTests(TestCase):
  74. """
  75. Tests for L{textFileLogObserver}.
  76. """
  77. def test_returnsFileLogObserver(self):
  78. """
  79. L{textFileLogObserver} returns a L{FileLogObserver}.
  80. """
  81. with StringIO() as fileHandle:
  82. observer = textFileLogObserver(fileHandle)
  83. self.assertIsInstance(observer, FileLogObserver)
  84. def test_outFile(self):
  85. """
  86. Returned L{FileLogObserver} has the correct outFile.
  87. """
  88. with StringIO() as fileHandle:
  89. observer = textFileLogObserver(fileHandle)
  90. self.assertIs(observer._outFile, fileHandle)
  91. def test_timeFormat(self):
  92. """
  93. Returned L{FileLogObserver} has the correct outFile.
  94. """
  95. with StringIO() as fileHandle:
  96. observer = textFileLogObserver(fileHandle, timeFormat=u"%f")
  97. observer(dict(log_format=u"XYZZY", log_time=1.23456))
  98. self.assertEqual(fileHandle.getvalue(), u"234560 [-#-] XYZZY\n")
  99. def test_observeFailure(self):
  100. """
  101. If the C{"log_failure"} key exists in an event, the observer appends
  102. the failure's traceback to the output.
  103. """
  104. with StringIO() as fileHandle:
  105. observer = textFileLogObserver(fileHandle)
  106. try:
  107. 1 / 0
  108. except ZeroDivisionError:
  109. failure = Failure()
  110. event = dict(log_failure=failure)
  111. observer(event)
  112. output = fileHandle.getvalue()
  113. self.assertTrue(output.split("\n")[1].startswith("\tTraceback "),
  114. msg=repr(output))
  115. def test_observeFailureThatRaisesInGetTraceback(self):
  116. """
  117. If the C{"log_failure"} key exists in an event, and contains an object
  118. that raises when you call its C{getTraceback()}, then the observer
  119. appends a message noting the problem, instead of raising.
  120. """
  121. with StringIO() as fileHandle:
  122. observer = textFileLogObserver(fileHandle)
  123. event = dict(log_failure=object()) # object has no getTraceback()
  124. observer(event)
  125. output = fileHandle.getvalue()
  126. expected = (
  127. "(UNABLE TO OBTAIN TRACEBACK FROM EVENT)"
  128. )
  129. self.assertIn(expected, output)
  130. class DummyFile(object):
  131. """
  132. File that counts writes and flushes.
  133. """
  134. def __init__(self):
  135. self.writes = 0
  136. self.flushes = 0
  137. def write(self, data):
  138. """
  139. Write data.
  140. @param data: data
  141. @type data: L{unicode} or L{bytes}
  142. """
  143. self.writes += 1
  144. def flush(self):
  145. """
  146. Flush buffers.
  147. """
  148. self.flushes += 1
  149. def __enter__(self):
  150. return self
  151. def __exit__(self, exc_type, exc_value, traceback):
  152. pass