| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564 |
- # Copyright (c) Twisted Matrix Laboratories.
- # See LICENSE for details.
- """
- Tests for L{twisted.logger._json}.
- """
- from io import StringIO, BytesIO
- from zope.interface.verify import verifyObject, BrokenMethodImplementation
- from twisted.python.compat import unicode
- from twisted.trial.unittest import TestCase
- from twisted.python.failure import Failure
- from .._observer import ILogObserver, LogPublisher
- from .._format import formatEvent
- from .._levels import LogLevel
- from .._flatten import extractField
- from .._global import globalLogPublisher
- from .._json import (
- eventAsJSON, eventFromJSON, jsonFileLogObserver, eventsFromJSONLogFile,
- log as jsonLog
- )
- from .._logger import Logger
- def savedJSONInvariants(testCase, savedJSON):
- """
- Assert a few things about the result of L{eventAsJSON}, then return it.
- @param testCase: The L{TestCase} with which to perform the assertions.
- @type testCase: L{TestCase}
- @param savedJSON: The result of L{eventAsJSON}.
- @type savedJSON: L{unicode} (we hope)
- @return: C{savedJSON}
- @rtype: L{unicode}
- @raise AssertionError: If any of the preconditions fail.
- """
- testCase.assertIsInstance(savedJSON, unicode)
- testCase.assertEqual(savedJSON.count("\n"), 0)
- return savedJSON
- class SaveLoadTests(TestCase):
- """
- Tests for loading and saving log events.
- """
- def savedEventJSON(self, event):
- """
- Serialize some an events, assert some things about it, and return the
- JSON.
- @param event: An event.
- @type event: L{dict}
- @return: JSON.
- """
- return savedJSONInvariants(self, eventAsJSON(event))
- def test_simpleSaveLoad(self):
- """
- Saving and loading an empty dictionary results in an empty dictionary.
- """
- self.assertEqual(eventFromJSON(self.savedEventJSON({})), {})
- def test_saveLoad(self):
- """
- Saving and loading a dictionary with some simple values in it results
- in those same simple values in the output; according to JSON's rules,
- though, all dictionary keys must be L{unicode} and any non-L{unicode}
- keys will be converted.
- """
- self.assertEqual(
- eventFromJSON(self.savedEventJSON({1: 2, u"3": u"4"})),
- {u"1": 2, u"3": u"4"}
- )
- def test_saveUnPersistable(self):
- """
- Saving and loading an object which cannot be represented in JSON will
- result in a placeholder.
- """
- self.assertEqual(
- eventFromJSON(self.savedEventJSON({u"1": 2, u"3": object()})),
- {u"1": 2, u"3": {u"unpersistable": True}}
- )
- def test_saveNonASCII(self):
- """
- Non-ASCII keys and values can be saved and loaded.
- """
- self.assertEqual(
- eventFromJSON(self.savedEventJSON(
- {u"\u1234": u"\u4321", u"3": object()}
- )),
- {u"\u1234": u"\u4321", u"3": {u"unpersistable": True}}
- )
- def test_saveBytes(self):
- """
- Any L{bytes} objects will be saved as if they are latin-1 so they can
- be faithfully re-loaded.
- """
- def asbytes(x):
- if bytes is str:
- return b"".join(map(chr, x))
- else:
- return bytes(x)
- inputEvent = {"hello": asbytes(range(255))}
- if bytes is not str:
- # On Python 3, bytes keys will be skipped by the JSON encoder. Not
- # much we can do about that. Let's make sure that we don't get an
- # error, though.
- inputEvent.update({b"skipped": "okay"})
- self.assertEqual(
- eventFromJSON(self.savedEventJSON(inputEvent)),
- {u"hello": asbytes(range(255)).decode("charmap")}
- )
- def test_saveUnPersistableThenFormat(self):
- """
- Saving and loading an object which cannot be represented in JSON, but
- has a string representation which I{can} be saved as JSON, will result
- in the same string formatting; any extractable fields will retain their
- data types.
- """
- class Reprable(object):
- def __init__(self, value):
- self.value = value
- def __repr__(self):
- return("reprable")
- inputEvent = {
- "log_format": "{object} {object.value}",
- "object": Reprable(7)
- }
- outputEvent = eventFromJSON(self.savedEventJSON(inputEvent))
- self.assertEqual(formatEvent(outputEvent), "reprable 7")
- def test_extractingFieldsPostLoad(self):
- """
- L{extractField} can extract fields from an object that's been saved and
- loaded from JSON.
- """
- class Obj(object):
- def __init__(self):
- self.value = 345
- inputEvent = dict(log_format="{object.value}", object=Obj())
- loadedEvent = eventFromJSON(self.savedEventJSON(inputEvent))
- self.assertEqual(extractField("object.value", loadedEvent), 345)
- # The behavior of extractField is consistent between pre-persistence
- # and post-persistence events, although looking up the key directly
- # won't be:
- self.assertRaises(KeyError, extractField, "object", loadedEvent)
- self.assertRaises(KeyError, extractField, "object", inputEvent)
- def test_failureStructurePreserved(self):
- """
- Round-tripping a failure through L{eventAsJSON} preserves its class and
- structure.
- """
- events = []
- log = Logger(observer=events.append)
- try:
- 1 / 0
- except ZeroDivisionError:
- f = Failure()
- log.failure("a message about failure", f)
- import sys
- if sys.exc_info()[0] is not None:
- # Make sure we don't get the same Failure by accident.
- sys.exc_clear()
- self.assertEqual(len(events), 1)
- loaded = eventFromJSON(self.savedEventJSON(events[0]))['log_failure']
- self.assertIsInstance(loaded, Failure)
- self.assertTrue(loaded.check(ZeroDivisionError))
- self.assertIsInstance(loaded.getTraceback(), str)
- def test_saveLoadLevel(self):
- """
- It's important that the C{log_level} key remain a
- L{constantly.NamedConstant} object.
- """
- inputEvent = dict(log_level=LogLevel.warn)
- loadedEvent = eventFromJSON(self.savedEventJSON(inputEvent))
- self.assertIs(loadedEvent["log_level"], LogLevel.warn)
- def test_saveLoadUnknownLevel(self):
- """
- If a saved bit of JSON (let's say, from a future version of Twisted)
- were to persist a different log_level, it will resolve as None.
- """
- loadedEvent = eventFromJSON(
- '{"log_level": {"name": "other", '
- '"__class_uuid__": "02E59486-F24D-46AD-8224-3ACDF2A5732A"}}'
- )
- self.assertEqual(loadedEvent, dict(log_level=None))
- class FileLogObserverTests(TestCase):
- """
- Tests for L{jsonFileLogObserver}.
- """
- def test_interface(self):
- """
- A L{FileLogObserver} returned by L{jsonFileLogObserver} is an
- L{ILogObserver}.
- """
- with StringIO() as fileHandle:
- observer = jsonFileLogObserver(fileHandle)
- try:
- verifyObject(ILogObserver, observer)
- except BrokenMethodImplementation as e:
- self.fail(e)
- def assertObserverWritesJSON(self, **kwargs):
- """
- Asserts that an observer created by L{jsonFileLogObserver} with the
- given arguments writes events serialized as JSON text, using the given
- record separator.
- @param recordSeparator: A record separator.
- @type recordSeparator: L{unicode}
- @param kwargs: Keyword arguments to pass to L{jsonFileLogObserver}.
- @type kwargs: L{dict}
- """
- recordSeparator = kwargs.get("recordSeparator", u"\x1e")
- with StringIO() as fileHandle:
- observer = jsonFileLogObserver(fileHandle, **kwargs)
- event = dict(x=1)
- observer(event)
- self.assertEqual(
- fileHandle.getvalue(),
- u'{0}{{"x": 1}}\n'.format(recordSeparator)
- )
- def test_observeWritesDefaultRecordSeparator(self):
- """
- A L{FileLogObserver} created by L{jsonFileLogObserver} writes events
- serialzed as JSON text to a file when it observes events.
- By default, the record separator is C{u"\\x1e"}.
- """
- self.assertObserverWritesJSON()
- def test_observeWritesEmptyRecordSeparator(self):
- """
- A L{FileLogObserver} created by L{jsonFileLogObserver} writes events
- serialzed as JSON text to a file when it observes events.
- This test sets the record separator to C{u""}.
- """
- self.assertObserverWritesJSON(recordSeparator=u"")
- def test_failureFormatting(self):
- """
- A L{FileLogObserver} created by L{jsonFileLogObserver} writes failures
- serialized as JSON text to a file when it observes events.
- """
- io = StringIO()
- publisher = LogPublisher()
- logged = []
- publisher.addObserver(logged.append)
- publisher.addObserver(jsonFileLogObserver(io))
- logger = Logger(observer=publisher)
- try:
- 1 / 0
- except:
- logger.failure("failed as expected")
- reader = StringIO(io.getvalue())
- deserialized = list(eventsFromJSONLogFile(reader))
- def checkEvents(logEvents):
- self.assertEqual(len(logEvents), 1)
- [failureEvent] = logEvents
- self.assertIn("log_failure", failureEvent)
- failureObject = failureEvent["log_failure"]
- self.assertIsInstance(failureObject, Failure)
- tracebackObject = failureObject.getTracebackObject()
- self.assertEqual(
- tracebackObject.tb_frame.f_code.co_filename.rstrip("co"),
- __file__.rstrip("co")
- )
- checkEvents(logged)
- checkEvents(deserialized)
- class LogFileReaderTests(TestCase):
- """
- Tests for L{eventsFromJSONLogFile}.
- """
- def setUp(self):
- self.errorEvents = []
- def observer(event):
- if (
- event["log_namespace"] == jsonLog.namespace and
- "record" in event
- ):
- self.errorEvents.append(event)
- self.logObserver = observer
- globalLogPublisher.addObserver(observer)
- def tearDown(self):
- globalLogPublisher.removeObserver(self.logObserver)
- def _readEvents(self, fileHandle, **kwargs):
- """
- Test that L{eventsFromJSONLogFile} reads two pre-defined events from a
- file: C{{u"x": 1}} and C{{u"y": 2}}.
- @param fileHandle: The file to read from.
- @param kwargs: Keyword arguments to pass to L{eventsFromJSONLogFile}.
- """
- events = eventsFromJSONLogFile(fileHandle, **kwargs)
- self.assertEqual(next(events), {u"x": 1})
- self.assertEqual(next(events), {u"y": 2})
- self.assertRaises(StopIteration, next, events) # No more events
- def test_readEventsAutoWithRecordSeparator(self):
- """
- L{eventsFromJSONLogFile} reads events from a file and automatically
- detects use of C{u"\\x1e"} as the record separator.
- """
- with StringIO(
- u'\x1e{"x": 1}\n'
- u'\x1e{"y": 2}\n'
- ) as fileHandle:
- self._readEvents(fileHandle)
- self.assertEqual(len(self.errorEvents), 0)
- def test_readEventsAutoEmptyRecordSeparator(self):
- """
- L{eventsFromJSONLogFile} reads events from a file and automatically
- detects use of C{u""} as the record separator.
- """
- with StringIO(
- u'{"x": 1}\n'
- u'{"y": 2}\n'
- ) as fileHandle:
- self._readEvents(fileHandle)
- self.assertEqual(len(self.errorEvents), 0)
- def test_readEventsExplicitRecordSeparator(self):
- """
- L{eventsFromJSONLogFile} reads events from a file and is told to use
- a specific record separator.
- """
- # Use u"\x08" (backspace)... because that seems weird enough.
- with StringIO(
- u'\x08{"x": 1}\n'
- u'\x08{"y": 2}\n'
- ) as fileHandle:
- self._readEvents(fileHandle, recordSeparator=u"\x08")
- self.assertEqual(len(self.errorEvents), 0)
- def test_readEventsPartialBuffer(self):
- """
- L{eventsFromJSONLogFile} handles buffering a partial event.
- """
- with StringIO(
- u'\x1e{"x": 1}\n'
- u'\x1e{"y": 2}\n'
- ) as fileHandle:
- # Use a buffer size smaller than the event text.
- self._readEvents(fileHandle, bufferSize=1)
- self.assertEqual(len(self.errorEvents), 0)
- def test_readTruncated(self):
- """
- If the JSON text for a record is truncated, skip it.
- """
- with StringIO(
- u'\x1e{"x": 1'
- u'\x1e{"y": 2}\n'
- ) as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- self.assertEqual(next(events), {u"y": 2})
- self.assertRaises(StopIteration, next, events) # No more events
- # We should have logged the lost record
- self.assertEqual(len(self.errorEvents), 1)
- self.assertEqual(
- self.errorEvents[0]["log_format"],
- u"Unable to read truncated JSON record: {record!r}"
- )
- self.assertEqual(self.errorEvents[0]["record"], b'{"x": 1')
- def test_readUnicode(self):
- """
- If the file being read from vends L{unicode}, strings decode from JSON
- as-is.
- """
- # The Euro currency sign is u"\u20ac"
- with StringIO(u'\x1e{"currency": "\u20ac"}\n') as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- self.assertEqual(next(events), {u"currency": u"\u20ac"})
- self.assertRaises(StopIteration, next, events) # No more events
- self.assertEqual(len(self.errorEvents), 0)
- def test_readUTF8Bytes(self):
- """
- If the file being read from vends L{bytes}, strings decode from JSON as
- UTF-8.
- """
- # The Euro currency sign is b"\xe2\x82\xac" in UTF-8
- with BytesIO(b'\x1e{"currency": "\xe2\x82\xac"}\n') as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- # The Euro currency sign is u"\u20ac"
- self.assertEqual(next(events), {u"currency": u"\u20ac"})
- self.assertRaises(StopIteration, next, events) # No more events
- self.assertEqual(len(self.errorEvents), 0)
- def test_readTruncatedUTF8Bytes(self):
- """
- If the JSON text for a record is truncated in the middle of a two-byte
- Unicode codepoint, we don't want to see a codec exception and the
- stream is read properly when the additional data arrives.
- """
- # The Euro currency sign is u"\u20ac" and encodes in UTF-8 as three
- # bytes: b"\xe2\x82\xac".
- with BytesIO(b'\x1e{"x": "\xe2\x82\xac"}\n') as fileHandle:
- events = eventsFromJSONLogFile(fileHandle, bufferSize=8)
- self.assertEqual(next(events), {u"x": u"\u20ac"}) # Got unicode
- self.assertRaises(StopIteration, next, events) # No more events
- self.assertEqual(len(self.errorEvents), 0)
- def test_readInvalidUTF8Bytes(self):
- """
- If the JSON text for a record contains invalid UTF-8 text, ignore that
- record.
- """
- # The string b"\xe2\xac" is bogus
- with BytesIO(
- b'\x1e{"x": "\xe2\xac"}\n'
- b'\x1e{"y": 2}\n'
- ) as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- self.assertEqual(next(events), {u"y": 2})
- self.assertRaises(StopIteration, next, events) # No more events
- # We should have logged the lost record
- self.assertEqual(len(self.errorEvents), 1)
- self.assertEqual(
- self.errorEvents[0]["log_format"],
- u"Unable to decode UTF-8 for JSON record: {record!r}"
- )
- self.assertEqual(
- self.errorEvents[0]["record"],
- b'{"x": "\xe2\xac"}\n'
- )
- def test_readInvalidJSON(self):
- """
- If the JSON text for a record is invalid, skip it.
- """
- with StringIO(
- u'\x1e{"x": }\n'
- u'\x1e{"y": 2}\n'
- ) as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- self.assertEqual(next(events), {u"y": 2})
- self.assertRaises(StopIteration, next, events) # No more events
- # We should have logged the lost record
- self.assertEqual(len(self.errorEvents), 1)
- self.assertEqual(
- self.errorEvents[0]["log_format"],
- u"Unable to read JSON record: {record!r}"
- )
- self.assertEqual(self.errorEvents[0]["record"], b'{"x": }\n')
- def test_readUnseparated(self):
- """
- Multiple events without a record separator are skipped.
- """
- with StringIO(
- u'\x1e{"x": 1}\n'
- u'{"y": 2}\n'
- ) as fileHandle:
- events = eventsFromJSONLogFile(fileHandle)
- self.assertRaises(StopIteration, next, events) # No more events
- # We should have logged the lost record
- self.assertEqual(len(self.errorEvents), 1)
- self.assertEqual(
- self.errorEvents[0]["log_format"],
- u"Unable to read JSON record: {record!r}"
- )
- self.assertEqual(
- self.errorEvents[0]["record"],
- b'{"x": 1}\n{"y": 2}\n'
- )
- def test_roundTrip(self):
- """
- Data written by a L{FileLogObserver} returned by L{jsonFileLogObserver}
- and read by L{eventsFromJSONLogFile} is reconstructed properly.
- """
- event = dict(x=1)
- with StringIO() as fileHandle:
- observer = jsonFileLogObserver(fileHandle)
- observer(event)
- fileHandle.seek(0)
- events = eventsFromJSONLogFile(fileHandle)
- self.assertEqual(tuple(events), (event,))
- self.assertEqual(len(self.errorEvents), 0)
|