|
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Test cases for L{twisted.logger._format}.
- """
- import sys
- from io import BytesIO, TextIOWrapper
- import logging as py_logging
- from inspect import getsourcefile
- from zope.interface.verify import verifyObject, BrokenMethodImplementation
- from twisted.python.compat import _PY3, currentframe
- from twisted.python.failure import Failure
- from twisted.trial import unittest
- from .._levels import LogLevel
- from .._observer import ILogObserver
- from .._stdlib import STDLibLogObserver
- def nextLine():
- """
- Retrive the file name and line number immediately after where this function
- is called.
- @return: the file name and line number
- @rtype: 2-L{tuple} of L{str}, L{int}
- """
- caller = currentframe(1)
- return (getsourcefile(sys.modules[caller.f_globals['__name__']]),
- caller.f_lineno + 1)
- class STDLibLogObserverTests(unittest.TestCase):
- """
- Tests for L{STDLibLogObserver}.
- """
- def test_interface(self):
- """
- L{STDLibLogObserver} is an L{ILogObserver}.
- """
- observer = STDLibLogObserver()
- try:
- verifyObject(ILogObserver, observer)
- except BrokenMethodImplementation as e:
- self.fail(e)
- def py_logger(self):
- """
- Create a logging object we can use to test with.
- @return: a stdlib-style logger
- @rtype: L{StdlibLoggingContainer}
- """
- logger = StdlibLoggingContainer()
- self.addCleanup(logger.close)
- return logger
- def logEvent(self, *events):
- """
- Send one or more events to Python's logging module, and
- capture the emitted L{logging.LogRecord}s and output stream as
- a string.
- @param events: events
- @type events: L{tuple} of L{dict}
- @return: a tuple: (records, output)
- @rtype: 2-tuple of (L{list} of L{logging.LogRecord}, L{bytes}.)
- """
- pl = self.py_logger()
- observer = STDLibLogObserver(
- # Add 1 to default stack depth to skip *this* frame, since
- # tests will want to know about their own frames.
- stackDepth=STDLibLogObserver.defaultStackDepth + 1
- )
- for event in events:
- observer(event)
- return pl.bufferedHandler.records, pl.outputAsText()
- def test_name(self):
- """
- Logger name.
- """
- records, output = self.logEvent({})
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0].name, "twisted")
- def test_levels(self):
- """
- Log levels.
- """
- levelMapping = {
- None: py_logging.INFO, # Default
- LogLevel.debug: py_logging.DEBUG,
- LogLevel.info: py_logging.INFO,
- LogLevel.warn: py_logging.WARNING,
- LogLevel.error: py_logging.ERROR,
- LogLevel.critical: py_logging.CRITICAL,
- }
- # Build a set of events for each log level
- events = []
- for level, pyLevel in levelMapping.items():
- event = {}
- # Set the log level on the event, except for default
- if level is not None:
- event["log_level"] = level
- # Remember the Python log level we expect to see for this
- # event (as an int)
- event["py_levelno"] = int(pyLevel)
- events.append(event)
- records, output = self.logEvent(*events)
- self.assertEqual(len(records), len(levelMapping))
- # Check that each event has the correct level
- for i in range(len(records)):
- self.assertEqual(records[i].levelno, events[i]["py_levelno"])
- def test_callerInfo(self):
- """
- C{pathname}, C{lineno}, C{exc_info}, C{func} is set properly on
- records.
- """
- filename, logLine = nextLine()
- records, output = self.logEvent({})
- self.assertEqual(len(records), 1)
- self.assertEqual(records[0].pathname, filename)
- self.assertEqual(records[0].lineno, logLine)
- self.assertIsNone(records[0].exc_info)
- # Attribute "func" is missing from record, which is weird because it's
- # documented.
- # self.assertEqual(records[0].func, "test_callerInfo")
- def test_basicFormat(self):
- """
- Basic formattable event passes the format along correctly.
- """
- event = dict(log_format="Hello, {who}!", who="dude")
- records, output = self.logEvent(event)
- self.assertEqual(len(records), 1)
- self.assertEqual(str(records[0].msg), u"Hello, dude!")
- self.assertEqual(records[0].args, ())
- def test_basicFormatRendered(self):
- """
- Basic formattable event renders correctly.
- """
- event = dict(log_format="Hello, {who}!", who="dude")
- records, output = self.logEvent(event)
- self.assertEqual(len(records), 1)
- self.assertTrue(output.endswith(u":Hello, dude!\n"),
- repr(output))
- def test_noFormat(self):
- """
- Event with no format.
- """
- records, output = self.logEvent({})
- self.assertEqual(len(records), 1)
- self.assertEqual(str(records[0].msg), "")
- def test_failure(self):
- """
- An event with a failure logs the failure details as well.
- """
- def failing_func():
- 1 / 0
- try:
- failing_func()
- except ZeroDivisionError:
- failure = Failure()
- event = dict(log_format='Hi mom', who='me', log_failure=failure)
- records, output = self.logEvent(event)
- self.assertEqual(len(records), 1)
- self.assertIn(u'Hi mom', output)
- self.assertIn(u'in failing_func', output)
- self.assertIn(u'ZeroDivisionError', output)
- def test_cleanedFailure(self):
- """
- A cleaned Failure object has a fake traceback object; make sure that
- logging such a failure still results in the exception details being
- logged.
- """
- def failing_func():
- 1 / 0
- try:
- failing_func()
- except ZeroDivisionError:
- failure = Failure()
- failure.cleanFailure()
- event = dict(log_format='Hi mom', who='me', log_failure=failure)
- records, output = self.logEvent(event)
- self.assertEqual(len(records), 1)
- self.assertIn(u'Hi mom', output)
- self.assertIn(u'in failing_func', output)
- self.assertIn(u'ZeroDivisionError', output)
- class StdlibLoggingContainer(object):
- """
- Continer for a test configuration of stdlib logging objects.
- """
- def __init__(self):
- self.rootLogger = py_logging.getLogger("")
- self.originalLevel = self.rootLogger.getEffectiveLevel()
- self.rootLogger.setLevel(py_logging.DEBUG)
- self.bufferedHandler = BufferedHandler()
- self.rootLogger.addHandler(self.bufferedHandler)
- self.streamHandler, self.output = handlerAndBytesIO()
- self.rootLogger.addHandler(self.streamHandler)
- def close(self):
- """
- Close the logger.
- """
- self.rootLogger.setLevel(self.originalLevel)
- self.rootLogger.removeHandler(self.bufferedHandler)
- self.rootLogger.removeHandler(self.streamHandler)
- self.streamHandler.close()
- self.output.close()
- def outputAsText(self):
- """
- Get the output to the underlying stream as text.
- @return: the output text
- @rtype: L{unicode}
- """
- return self.output.getvalue().decode("utf-8")
- def handlerAndBytesIO():
- """
- Construct a 2-tuple of C{(StreamHandler, BytesIO)} for testing interaction
- with the 'logging' module.
- @return: handler and io object
- @rtype: tuple of L{StreamHandler} and L{io.BytesIO}
- """
- output = BytesIO()
- stream = output
- template = py_logging.BASIC_FORMAT
- if _PY3:
- stream = TextIOWrapper(output, encoding="utf-8", newline="\n")
- formatter = py_logging.Formatter(template)
- handler = py_logging.StreamHandler(stream)
- handler.setFormatter(formatter)
- return handler, output
- class BufferedHandler(py_logging.Handler):
- """
- A L{py_logging.Handler} that remembers all logged records in a list.
- """
- def __init__(self):
- """
- Initialize this L{BufferedHandler}.
- """
- py_logging.Handler.__init__(self)
- self.records = []
- def emit(self, record):
- """
- Remember the record.
- """
- self.records.append(record)
|