123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589 |
- """
- ast.literal_eval() compatible object tree serialization.
- Serpent serializes an object tree into bytes (utf-8 encoded string) that can
- be decoded and then passed as-is to ast.literal_eval() to rebuild it as the
- original object tree. As such it is safe to send serpent data to other
- machines over the network for instance (because only 'safe' literals are
- encoded).
- Compatible with Python 2.7+ (including 3.x), IronPython 2.7+, Jython 2.7+.
- Serpent handles several special Python types to make life easier:
- - str --> promoted to unicode (see below why this is)
- - bytes, bytearrays, memoryview, buffer --> string, base-64
- (you'll have to manually un-base64 them though)
- - uuid.UUID, datetime.{datetime, date, time, timespan} --> appropriate string/number
- - decimal.Decimal --> string (to not lose precision)
- - array.array typecode 'c'/'u' --> string/unicode
- - array.array other typecode --> list
- - Exception --> dict with some fields of the exception (message, args)
- - all other types --> dict with __getstate__ or vars() of the object
- Notes:
- All str will be promoted to unicode. This is done because it is the
- default anyway for Python 3.x, and it solves the problem of the str/unicode
- difference between different Python versions. Also it means the serialized
- output doesn't have those problematic 'u' prefixes on strings.
- The serializer is not thread-safe. Make sure you're not making changes
- to the object tree that is being serialized, and don't use the same
- serializer in different threads.
- Because the serialized format is just valid Python source code, it can
- contain comments.
- Set literals are not supported on python <3.2 (ast.literal_eval
- limitation). If you need Python < 3.2 compatibility, you'll have to use
- set_literals=False when serializing. Since version 1.6 serpent chooses
- this wisely for you by default, but you can still override it if needed.
- Floats +inf and -inf are handled via a trick, Float 'nan' cannot be handled
- and is represented by the special value: {'__class__':'float','value':'nan'}
- We chose not to encode it as just the string 'NaN' because that could cause
- memory issues when used in multiplications.
- Copyright by Irmen de Jong (irmen@razorvine.net)
- Software license: "MIT software license". See http://opensource.org/licenses/MIT
- """
- from __future__ import print_function, division
- import __future__
- import ast
- import base64
- import sys
- import types
- import os
- import gc
- import collections
- import decimal
- import datetime
- import uuid
- import array
- import math
- __version__ = "1.15"
- __all__ = ["dump", "dumps", "load", "loads", "register_class", "unregister_class", "tobytes"]
- can_use_set_literals = sys.version_info >= (3, 2) # check if we can use set literals
- def dumps(obj, indent=False, set_literals=can_use_set_literals, module_in_classname=False):
- """Serialize object tree to bytes"""
- return Serializer(indent, set_literals, module_in_classname).serialize(obj)
- def dump(obj, file, indent=False, set_literals=can_use_set_literals, module_in_classname=False):
- """Serialize object tree to a file"""
- file.write(dumps(obj, indent=indent, set_literals=set_literals, module_in_classname=module_in_classname))
- def loads(serialized_bytes):
- """Deserialize bytes back to object tree. Uses ast.literal_eval (safe)."""
- serialized = serialized_bytes.decode("utf-8")
- if sys.version_info < (3, 0) and sys.platform != "cli":
- if os.name == "java":
- # Because of a bug in Jython we have to manually convert all Str nodes to unicode. See http://bugs.jython.org/issue2008
- serialized = ast.parse(serialized, "<serpent>", mode="eval")
- for node in ast.walk(serialized):
- if isinstance(node, ast.Str) and type(node.s) is str:
- node.s = node.s.decode("utf-8")
- else:
- # python 2.x: parse with unicode_literals (promotes all strings to unicode)
- serialized = compile(serialized, "<serpent>", mode="eval", flags=ast.PyCF_ONLY_AST | __future__.unicode_literals.compiler_flag)
- try:
- if os.name != "java" and sys.platform != "cli":
- gc.disable()
- return ast.literal_eval(serialized)
- finally:
- gc.enable()
- def load(file):
- """Deserialize bytes from a file back to object tree. Uses ast.literal_eval (safe)."""
- data = file.read()
- return loads(data)
- def _ser_OrderedDict(obj, serializer, outputstream, indentlevel):
- obj = {
- "__class__": "collections.OrderedDict" if serializer.module_in_classname else "OrderedDict",
- "items": list(obj.items())
- }
- serializer._serialize(obj, outputstream, indentlevel)
- def _ser_DictView(obj, serializer, outputstream, indentlevel):
- serializer.ser_builtins_list(obj, outputstream, indentlevel)
- _special_classes_registry = {
- collections.KeysView: _ser_DictView,
- collections.ValuesView: _ser_DictView,
- collections.ItemsView: _ser_DictView
- }
- if sys.version_info >= (2, 7):
- _special_classes_registry[collections.OrderedDict] = _ser_OrderedDict
- def unregister_class(clazz):
- """Unregister the specialcase serializer for the given class."""
- if clazz in _special_classes_registry:
- del _special_classes_registry[clazz]
- def register_class(clazz, serializer):
- """
- Register a special serializer function for objects of the given class.
- The function will be called with (object, serpent_serializer, outputstream, indentlevel) arguments.
- The function must write the serialized data to outputstream. It doesn't return a value.
- """
- _special_classes_registry[clazz] = serializer
- class BytesWrapper(object):
- """
- Wrapper for bytes, bytearray etc. to make them appear as base-64 encoded data.
- You can use the tobytes utility function to decode this back into the actual bytes (or do it manually)
- """
- def __init__(self, data):
- self.data = data
- def __getstate__(self):
- if sys.platform == "cli":
- b64 = base64.b64encode(str(self.data)) # weird IronPython bug?
- elif (os.name == "java" or sys.version_info < (2, 7)) and type(self.data) is bytearray:
- b64 = base64.b64encode(bytes(self.data)) # Jython bug http://bugs.jython.org/issue2011
- else:
- b64 = base64.b64encode(self.data)
- return {
- "data": b64 if type(b64) is str else b64.decode("ascii"),
- "encoding": "base64"
- }
- @staticmethod
- def from_bytes(data):
- return BytesWrapper(data)
- @staticmethod
- def from_bytearray(data):
- return BytesWrapper(data)
- @staticmethod
- def from_memoryview(data):
- return BytesWrapper(data.tobytes())
- @staticmethod
- def from_buffer(data):
- return BytesWrapper(data)
- if sys.version_info < (3, 0):
- _repr = repr # python <3.0 won't need explicit encoding to utf-8, so we optimize this
- else:
- def _repr(obj):
- return repr(obj).encode("utf-8")
- _repr_types = set([
- str,
- int,
- bool,
- type(None)
- ])
- _translate_types = {
- bytes: BytesWrapper.from_bytes,
- bytearray: BytesWrapper.from_bytearray,
- collections.deque: list,
- }
- if sys.version_info >= (3, 0):
- _translate_types.update({
- collections.UserDict: dict,
- collections.UserList: list,
- collections.UserString: str
- })
- _bytes_types = [bytes, bytearray, memoryview]
- # do some dynamic changes to the types configuration if needed
- if bytes is str:
- del _translate_types[bytes]
- if hasattr(types, "BufferType"):
- _translate_types[types.BufferType] = BytesWrapper.from_buffer
- _bytes_types.append(buffer)
- try:
- _translate_types[memoryview] = BytesWrapper.from_memoryview
- except NameError:
- pass
- if sys.platform == "cli":
- _repr_types.remove(str) # IronPython needs special str treatment, otherwise it treats unicode wrong
- _bytes_types = tuple(_bytes_types)
- def tobytes(obj):
- """
- Utility function to convert obj back to actual bytes if it is a serpent-encoded bytes dictionary
- (a dict with base-64 encoded 'data' in it and 'encoding'='base64').
- If obj is already bytes or a byte-like type, return obj unmodified.
- Will raise TypeError if obj is none of the above.
- """
- if isinstance(obj, _bytes_types):
- return obj
- if isinstance(obj, dict) and "data" in obj and obj.get("encoding") == "base64":
- return base64.b64decode(obj["data"])
- raise TypeError("argument is neither bytes nor serpent base64 encoded bytes dict")
- class Serializer(object):
- """
- Serialize an object tree to a byte stream.
- It is not thread-safe: make sure you're not making changes to the
- object tree that is being serialized, and don't use the same serializer
- across different threads.
- """
- dispatch = {}
- def __init__(self, indent=False, set_literals=can_use_set_literals, module_in_classname=False):
- """
- Initialize the serializer.
- indent=indent the output over multiple lines (default=false)
- setLiterals=use set-literals or not (set to False if you need compatibility with Python < 3.2). Serpent chooses a sensible default for you.
- module_in_classname = include module prefix for class names or only use the class name itself
- """
- self.indent = indent
- self.set_literals = set_literals
- self.module_in_classname = module_in_classname
- self.serialized_obj_ids = set()
- self.special_classes_registry_copy = None
- def serialize(self, obj):
- """Serialize the object tree to bytes."""
- self.special_classes_registry_copy = _special_classes_registry.copy() # make it thread safe
- header = "# serpent utf-8 "
- if self.set_literals:
- header += "python3.2\n" # set-literals require python 3.2+ to deserialize (ast.literal_eval limitation)
- else:
- header += "python2.6\n" # don't change this even though we don't support 2.6 any longer, otherwise we can't read older serpent strings
- out = [header.encode("utf-8")]
- try:
- if os.name != "java" and sys.platform != "cli":
- gc.disable()
- self.serialized_obj_ids = set()
- self._serialize(obj, out, 0)
- finally:
- gc.enable()
- self.special_classes_registry_copy = None
- del self.serialized_obj_ids
- if sys.platform == "cli":
- return "".join(out)
- return b"".join(out)
- _shortcut_dispatch_types = frozenset([float, complex, tuple, list, dict, set, frozenset])
- def _serialize(self, obj, out, level):
- t = type(obj)
- if t in _translate_types:
- obj = _translate_types[t](obj)
- t = type(obj)
- if t in _repr_types:
- out.append(_repr(obj)) # just a simple repr() is enough for these objects
- return
- if t in self._shortcut_dispatch_types:
- # we shortcut these builtins directly to the dispatch function to avoid type lookup overhead below
- return self.dispatch[t](self, obj, out, level)
- # check special registered types:
- special_classes = self.special_classes_registry_copy
- for clazz in special_classes:
- if isinstance(obj, clazz):
- special_classes[clazz](obj, self, out, level)
- return
- # serialize dispatch
- try:
- func = self.dispatch[t]
- except KeyError:
- # walk the MRO until we find a base class we recognise
- for type_ in t.__mro__:
- if type_ in self.dispatch:
- func = self.dispatch[type_]
- break
- else:
- # fall back to the default class serializer
- func = Serializer.ser_default_class
- func(self, obj, out, level)
- def ser_builtins_str(self, str_obj, out, level):
- # special case str, for IronPython where str==unicode and repr() yields undesired result
- self.ser_builtins_unicode(str_obj, out, level)
- dispatch[str] = ser_builtins_str
- def ser_builtins_float(self, float_obj, out, level):
- if math.isnan(float_obj):
- # there's no literal expression for a float NaN...
- out.append(b"{'__class__':'float','value':'nan'}")
- elif math.isinf(float_obj):
- # output a literal expression that overflows the float and results in +/-INF
- if float_obj > 0:
- out.append(b"1e30000")
- else:
- out.append(b"-1e30000")
- else:
- out.append(str(float_obj).encode("ascii"))
- dispatch[float] = ser_builtins_float
- def ser_builtins_complex(self, complex_obj, out, level):
- out.append(b"(")
- self.ser_builtins_float(complex_obj.real, out, level)
- if complex_obj.imag >= 0:
- out.append(b"+")
- self.ser_builtins_float(complex_obj.imag, out, level)
- out.append(b"j)")
- dispatch[complex] = ser_builtins_complex
- if sys.version_info < (3, 0):
- def ser_builtins_unicode(self, unicode_obj, out, level):
- # this method is used for python 2.x unicode (python 3.x doesn't use this)
- z = unicode_obj.encode("utf-8")
- # double-escape existing backslashes:
- z = z.replace("\\", "\\\\")
- # backslash-escape control characters:
- z = z.replace("\a", "\\a")
- z = z.replace("\b", "\\b")
- z = z.replace("\f", "\\f")
- z = z.replace("\n", "\\n")
- z = z.replace("\r", "\\r")
- z = z.replace("\t", "\\t")
- z = z.replace("\v", "\\v")
- if "'" not in z:
- z = "'" + z + "'"
- elif '"' not in z:
- z = '"' + z + '"'
- else:
- z = z.replace("'", "\\'")
- z = "'" + z + "'"
- out.append(z)
- dispatch[unicode] = ser_builtins_unicode
- if sys.version_info < (3, 0):
- def ser_builtins_long(self, long_obj, out, level):
- # used with python 2.x
- out.append(str(long_obj))
- dispatch[long] = ser_builtins_long
- def ser_builtins_tuple(self, tuple_obj, out, level):
- append = out.append
- serialize = self._serialize
- if self.indent and tuple_obj:
- indent_chars = b" " * level
- indent_chars_inside = indent_chars + b" "
- append(b"(\n")
- for elt in tuple_obj:
- append(indent_chars_inside)
- serialize(elt, out, level + 1)
- append(b",\n")
- out[-1] = out[-1].rstrip() # remove the last \n
- if len(tuple_obj) > 1:
- del out[-1] # undo the last ,
- append(b"\n" + indent_chars + b")")
- else:
- append(b"(")
- for elt in tuple_obj:
- serialize(elt, out, level + 1)
- append(b",")
- if len(tuple_obj) > 1:
- del out[-1] # undo the last ,
- append(b")")
- dispatch[tuple] = ser_builtins_tuple
- def ser_builtins_list(self, list_obj, out, level):
- if id(list_obj) in self.serialized_obj_ids:
- raise ValueError("Circular reference detected (list)")
- self.serialized_obj_ids.add(id(list_obj))
- append = out.append
- serialize = self._serialize
- if self.indent and list_obj:
- indent_chars = b" " * level
- indent_chars_inside = indent_chars + b" "
- append(b"[\n")
- for elt in list_obj:
- append(indent_chars_inside)
- serialize(elt, out, level + 1)
- append(b",\n")
- del out[-1] # remove the last ,\n
- append(b"\n" + indent_chars + b"]")
- else:
- append(b"[")
- for elt in list_obj:
- serialize(elt, out, level + 1)
- append(b",")
- if list_obj:
- del out[-1] # remove the last ,
- append(b"]")
- self.serialized_obj_ids.discard(id(list_obj))
- dispatch[list] = ser_builtins_list
- def ser_builtins_dict(self, dict_obj, out, level):
- if id(dict_obj) in self.serialized_obj_ids:
- raise ValueError("Circular reference detected (dict)")
- self.serialized_obj_ids.add(id(dict_obj))
- append = out.append
- serialize = self._serialize
- if self.indent and dict_obj:
- indent_chars = b" " * level
- indent_chars_inside = indent_chars + b" "
- append(b"{\n")
- dict_items = dict_obj.items()
- try:
- sorted_items = sorted(dict_items)
- except TypeError: # can occur when elements can't be ordered (Python 3.x)
- sorted_items = dict_items
- for key, value in sorted_items:
- append(indent_chars_inside)
- serialize(key, out, level + 1)
- append(b": ")
- serialize(value, out, level + 1)
- append(b",\n")
- del out[-1] # remove last ,\n
- append(b"\n" + indent_chars + b"}")
- else:
- append(b"{")
- for key, value in dict_obj.items():
- serialize(key, out, level + 1)
- append(b":")
- serialize(value, out, level + 1)
- append(b",")
- if dict_obj:
- del out[-1] # remove the last ,
- append(b"}")
- self.serialized_obj_ids.discard(id(dict_obj))
- dispatch[dict] = ser_builtins_dict
- def ser_builtins_set(self, set_obj, out, level):
- if not self.set_literals:
- if self.indent:
- set_obj = sorted(set_obj)
- self._serialize(tuple(set_obj), out, level) # use a tuple instead of a set literal
- return
- append = out.append
- serialize = self._serialize
- if self.indent and set_obj:
- indent_chars = b" " * level
- indent_chars_inside = indent_chars + b" "
- append(b"{\n")
- try:
- sorted_elts = sorted(set_obj)
- except TypeError: # can occur when elements can't be ordered (Python 3.x)
- sorted_elts = set_obj
- for elt in sorted_elts:
- append(indent_chars_inside)
- serialize(elt, out, level + 1)
- append(b",\n")
- del out[-1] # remove the last ,\n
- append(b"\n" + indent_chars + b"}")
- elif set_obj:
- append(b"{")
- for elt in set_obj:
- serialize(elt, out, level + 1)
- append(b",")
- del out[-1] # remove the last ,
- append(b"}")
- else:
- # empty set literal doesn't exist unfortunately, replace with empty tuple
- self.ser_builtins_tuple((), out, level)
- dispatch[set] = ser_builtins_set
- def ser_builtins_frozenset(self, set_obj, out, level):
- self.ser_builtins_set(set_obj, out, level)
- dispatch[frozenset] = ser_builtins_set
- def ser_decimal_Decimal(self, decimal_obj, out, level):
- # decimal is serialized as a string to avoid losing precision
- self._serialize(str(decimal_obj), out, level)
- dispatch[decimal.Decimal] = ser_decimal_Decimal
- def ser_datetime_datetime(self, datetime_obj, out, level):
- self._serialize(datetime_obj.isoformat(), out, level)
- dispatch[datetime.datetime] = ser_datetime_datetime
-
- def ser_datetime_date(self, date_obj, out, level):
- self._serialize(date_obj.isoformat(), out, level)
- dispatch[datetime.date] = ser_datetime_date
-
- if os.name == "java" or sys.version_info < (2, 7): # jython bug http://bugs.jython.org/issue2010
- def ser_datetime_timedelta(self, timedelta_obj, out, level):
- secs = ((timedelta_obj.days * 86400 + timedelta_obj.seconds) * 10 ** 6 + timedelta_obj.microseconds) / 10 ** 6
- self._serialize(secs, out, level)
- else:
- def ser_datetime_timedelta(self, timedelta_obj, out, level):
- secs = timedelta_obj.total_seconds()
- self._serialize(secs, out, level)
- dispatch[datetime.timedelta] = ser_datetime_timedelta
- def ser_datetime_time(self, time_obj, out, level):
- self._serialize(str(time_obj), out, level)
- dispatch[datetime.time] = ser_datetime_time
- def ser_uuid_UUID(self, uuid_obj, out, level):
- self._serialize(str(uuid_obj), out, level)
- dispatch[uuid.UUID] = ser_uuid_UUID
- def ser_exception_class(self, exc_obj, out, level):
- value = {
- "__class__": self.get_class_name(exc_obj),
- "__exception__": True,
- "args": exc_obj.args,
- "attributes": vars(exc_obj) # add any custom attributes
- }
- self._serialize(value, out, level)
- dispatch[BaseException] = ser_exception_class
- def ser_array_array(self, array_obj, out, level):
- if array_obj.typecode == 'c':
- self._serialize(array_obj.tostring(), out, level)
- elif array_obj.typecode == 'u':
- self._serialize(array_obj.tounicode(), out, level)
- else:
- self._serialize(array_obj.tolist(), out, level)
- dispatch[array.array] = ser_array_array
- def ser_default_class(self, obj, out, level):
- if id(obj) in self.serialized_obj_ids:
- raise ValueError("Circular reference detected (class)")
- self.serialized_obj_ids.add(id(obj))
- try:
- try:
- value = obj.__getstate__()
- if value is None and isinstance(obj, tuple):
- # collections.namedtuple specialcase (if it is not handled by the tuple serializer)
- value = {
- "__class__": self.get_class_name(obj),
- "items": list(obj._asdict().items())
- }
- if isinstance(value, dict):
- self.ser_builtins_dict(value, out, level)
- return
- except AttributeError:
- try:
- value = dict(vars(obj)) # make sure we can serialize anything that resembles a dict
- value["__class__"] = self.get_class_name(obj)
- except TypeError:
- if hasattr(obj, "__slots__"):
- # use the __slots__ instead of the vars dict
- value = {}
- for slot in obj.__slots__:
- value[slot] = getattr(obj, slot)
- value["__class__"] = self.get_class_name(obj)
- else:
- raise TypeError("don't know how to serialize class " + str(obj.__class__) + ". Give it vars() or an appropriate __getstate__")
- self._serialize(value, out, level)
- finally:
- self.serialized_obj_ids.discard(id(obj))
- def get_class_name(self, obj):
- if self.module_in_classname:
- return "%s.%s" % (obj.__class__.__module__, obj.__class__.__name__)
- else:
- return obj.__class__.__name__
|