test_stdlib.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. # Copyright (c) Twisted Matrix Laboratories.
  2. # See LICENSE for details.
  3. """
  4. Test cases for L{twisted.logger._format}.
  5. """
  6. import sys
  7. from io import BytesIO, TextIOWrapper
  8. import logging as py_logging
  9. from inspect import getsourcefile
  10. from zope.interface.verify import verifyObject, BrokenMethodImplementation
  11. from twisted.python.compat import _PY3, currentframe
  12. from twisted.python.failure import Failure
  13. from twisted.trial import unittest
  14. from .._levels import LogLevel
  15. from .._observer import ILogObserver
  16. from .._stdlib import STDLibLogObserver
  17. def nextLine():
  18. """
  19. Retrive the file name and line number immediately after where this function
  20. is called.
  21. @return: the file name and line number
  22. @rtype: 2-L{tuple} of L{str}, L{int}
  23. """
  24. caller = currentframe(1)
  25. return (getsourcefile(sys.modules[caller.f_globals['__name__']]),
  26. caller.f_lineno + 1)
  27. class STDLibLogObserverTests(unittest.TestCase):
  28. """
  29. Tests for L{STDLibLogObserver}.
  30. """
  31. def test_interface(self):
  32. """
  33. L{STDLibLogObserver} is an L{ILogObserver}.
  34. """
  35. observer = STDLibLogObserver()
  36. try:
  37. verifyObject(ILogObserver, observer)
  38. except BrokenMethodImplementation as e:
  39. self.fail(e)
  40. def py_logger(self):
  41. """
  42. Create a logging object we can use to test with.
  43. @return: a stdlib-style logger
  44. @rtype: L{StdlibLoggingContainer}
  45. """
  46. logger = StdlibLoggingContainer()
  47. self.addCleanup(logger.close)
  48. return logger
  49. def logEvent(self, *events):
  50. """
  51. Send one or more events to Python's logging module, and
  52. capture the emitted L{logging.LogRecord}s and output stream as
  53. a string.
  54. @param events: events
  55. @type events: L{tuple} of L{dict}
  56. @return: a tuple: (records, output)
  57. @rtype: 2-tuple of (L{list} of L{logging.LogRecord}, L{bytes}.)
  58. """
  59. pl = self.py_logger()
  60. observer = STDLibLogObserver(
  61. # Add 1 to default stack depth to skip *this* frame, since
  62. # tests will want to know about their own frames.
  63. stackDepth=STDLibLogObserver.defaultStackDepth + 1
  64. )
  65. for event in events:
  66. observer(event)
  67. return pl.bufferedHandler.records, pl.outputAsText()
  68. def test_name(self):
  69. """
  70. Logger name.
  71. """
  72. records, output = self.logEvent({})
  73. self.assertEqual(len(records), 1)
  74. self.assertEqual(records[0].name, "twisted")
  75. def test_levels(self):
  76. """
  77. Log levels.
  78. """
  79. levelMapping = {
  80. None: py_logging.INFO, # Default
  81. LogLevel.debug: py_logging.DEBUG,
  82. LogLevel.info: py_logging.INFO,
  83. LogLevel.warn: py_logging.WARNING,
  84. LogLevel.error: py_logging.ERROR,
  85. LogLevel.critical: py_logging.CRITICAL,
  86. }
  87. # Build a set of events for each log level
  88. events = []
  89. for level, pyLevel in levelMapping.items():
  90. event = {}
  91. # Set the log level on the event, except for default
  92. if level is not None:
  93. event["log_level"] = level
  94. # Remember the Python log level we expect to see for this
  95. # event (as an int)
  96. event["py_levelno"] = int(pyLevel)
  97. events.append(event)
  98. records, output = self.logEvent(*events)
  99. self.assertEqual(len(records), len(levelMapping))
  100. # Check that each event has the correct level
  101. for i in range(len(records)):
  102. self.assertEqual(records[i].levelno, events[i]["py_levelno"])
  103. def test_callerInfo(self):
  104. """
  105. C{pathname}, C{lineno}, C{exc_info}, C{func} is set properly on
  106. records.
  107. """
  108. filename, logLine = nextLine()
  109. records, output = self.logEvent({})
  110. self.assertEqual(len(records), 1)
  111. self.assertEqual(records[0].pathname, filename)
  112. self.assertEqual(records[0].lineno, logLine)
  113. self.assertIsNone(records[0].exc_info)
  114. # Attribute "func" is missing from record, which is weird because it's
  115. # documented.
  116. # self.assertEqual(records[0].func, "test_callerInfo")
  117. def test_basicFormat(self):
  118. """
  119. Basic formattable event passes the format along correctly.
  120. """
  121. event = dict(log_format="Hello, {who}!", who="dude")
  122. records, output = self.logEvent(event)
  123. self.assertEqual(len(records), 1)
  124. self.assertEqual(str(records[0].msg), u"Hello, dude!")
  125. self.assertEqual(records[0].args, ())
  126. def test_basicFormatRendered(self):
  127. """
  128. Basic formattable event renders correctly.
  129. """
  130. event = dict(log_format="Hello, {who}!", who="dude")
  131. records, output = self.logEvent(event)
  132. self.assertEqual(len(records), 1)
  133. self.assertTrue(output.endswith(u":Hello, dude!\n"),
  134. repr(output))
  135. def test_noFormat(self):
  136. """
  137. Event with no format.
  138. """
  139. records, output = self.logEvent({})
  140. self.assertEqual(len(records), 1)
  141. self.assertEqual(str(records[0].msg), "")
  142. def test_failure(self):
  143. """
  144. An event with a failure logs the failure details as well.
  145. """
  146. def failing_func():
  147. 1 / 0
  148. try:
  149. failing_func()
  150. except ZeroDivisionError:
  151. failure = Failure()
  152. event = dict(log_format='Hi mom', who='me', log_failure=failure)
  153. records, output = self.logEvent(event)
  154. self.assertEqual(len(records), 1)
  155. self.assertIn(u'Hi mom', output)
  156. self.assertIn(u'in failing_func', output)
  157. self.assertIn(u'ZeroDivisionError', output)
  158. def test_cleanedFailure(self):
  159. """
  160. A cleaned Failure object has a fake traceback object; make sure that
  161. logging such a failure still results in the exception details being
  162. logged.
  163. """
  164. def failing_func():
  165. 1 / 0
  166. try:
  167. failing_func()
  168. except ZeroDivisionError:
  169. failure = Failure()
  170. failure.cleanFailure()
  171. event = dict(log_format='Hi mom', who='me', log_failure=failure)
  172. records, output = self.logEvent(event)
  173. self.assertEqual(len(records), 1)
  174. self.assertIn(u'Hi mom', output)
  175. self.assertIn(u'in failing_func', output)
  176. self.assertIn(u'ZeroDivisionError', output)
  177. class StdlibLoggingContainer(object):
  178. """
  179. Continer for a test configuration of stdlib logging objects.
  180. """
  181. def __init__(self):
  182. self.rootLogger = py_logging.getLogger("")
  183. self.originalLevel = self.rootLogger.getEffectiveLevel()
  184. self.rootLogger.setLevel(py_logging.DEBUG)
  185. self.bufferedHandler = BufferedHandler()
  186. self.rootLogger.addHandler(self.bufferedHandler)
  187. self.streamHandler, self.output = handlerAndBytesIO()
  188. self.rootLogger.addHandler(self.streamHandler)
  189. def close(self):
  190. """
  191. Close the logger.
  192. """
  193. self.rootLogger.setLevel(self.originalLevel)
  194. self.rootLogger.removeHandler(self.bufferedHandler)
  195. self.rootLogger.removeHandler(self.streamHandler)
  196. self.streamHandler.close()
  197. self.output.close()
  198. def outputAsText(self):
  199. """
  200. Get the output to the underlying stream as text.
  201. @return: the output text
  202. @rtype: L{unicode}
  203. """
  204. return self.output.getvalue().decode("utf-8")
  205. def handlerAndBytesIO():
  206. """
  207. Construct a 2-tuple of C{(StreamHandler, BytesIO)} for testing interaction
  208. with the 'logging' module.
  209. @return: handler and io object
  210. @rtype: tuple of L{StreamHandler} and L{io.BytesIO}
  211. """
  212. output = BytesIO()
  213. stream = output
  214. template = py_logging.BASIC_FORMAT
  215. if _PY3:
  216. stream = TextIOWrapper(output, encoding="utf-8", newline="\n")
  217. formatter = py_logging.Formatter(template)
  218. handler = py_logging.StreamHandler(stream)
  219. handler.setFormatter(formatter)
  220. return handler, output
  221. class BufferedHandler(py_logging.Handler):
  222. """
  223. A L{py_logging.Handler} that remembers all logged records in a list.
  224. """
  225. def __init__(self):
  226. """
  227. Initialize this L{BufferedHandler}.
  228. """
  229. py_logging.Handler.__init__(self)
  230. self.records = []
  231. def emit(self, record):
  232. """
  233. Remember the record.
  234. """
  235. self.records.append(record)