test_extension.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from __future__ import print_function
  2. import array
  3. import pandas.io.msgpack as msgpack
  4. from pandas.io.msgpack import ExtType
  5. from .common import frombytes, tobytes
  6. def test_pack_ext_type():
  7. def p(s):
  8. packer = msgpack.Packer()
  9. packer.pack_ext_type(0x42, s)
  10. return packer.bytes()
  11. assert p(b'A') == b'\xd4\x42A' # fixext 1
  12. assert p(b'AB') == b'\xd5\x42AB' # fixext 2
  13. assert p(b'ABCD') == b'\xd6\x42ABCD' # fixext 4
  14. assert p(b'ABCDEFGH') == b'\xd7\x42ABCDEFGH' # fixext 8
  15. assert p(b'A' * 16) == b'\xd8\x42' + b'A' * 16 # fixext 16
  16. assert p(b'ABC') == b'\xc7\x03\x42ABC' # ext 8
  17. assert p(b'A' * 0x0123) == b'\xc8\x01\x23\x42' + b'A' * 0x0123 # ext 16
  18. assert (p(b'A' * 0x00012345) ==
  19. b'\xc9\x00\x01\x23\x45\x42' + b'A' * 0x00012345) # ext 32
  20. def test_unpack_ext_type():
  21. def check(b, expected):
  22. assert msgpack.unpackb(b) == expected
  23. check(b'\xd4\x42A', ExtType(0x42, b'A')) # fixext 1
  24. check(b'\xd5\x42AB', ExtType(0x42, b'AB')) # fixext 2
  25. check(b'\xd6\x42ABCD', ExtType(0x42, b'ABCD')) # fixext 4
  26. check(b'\xd7\x42ABCDEFGH', ExtType(0x42, b'ABCDEFGH')) # fixext 8
  27. check(b'\xd8\x42' + b'A' * 16, ExtType(0x42, b'A' * 16)) # fixext 16
  28. check(b'\xc7\x03\x42ABC', ExtType(0x42, b'ABC')) # ext 8
  29. check(b'\xc8\x01\x23\x42' + b'A' * 0x0123,
  30. ExtType(0x42, b'A' * 0x0123)) # ext 16
  31. check(b'\xc9\x00\x01\x23\x45\x42' + b'A' * 0x00012345,
  32. ExtType(0x42, b'A' * 0x00012345)) # ext 32
  33. def test_extension_type():
  34. def default(obj):
  35. print('default called', obj)
  36. if isinstance(obj, array.array):
  37. typecode = 123 # application specific typecode
  38. data = tobytes(obj)
  39. return ExtType(typecode, data)
  40. raise TypeError("Unknown type object %r" % (obj, ))
  41. def ext_hook(code, data):
  42. print('ext_hook called', code, data)
  43. assert code == 123
  44. obj = array.array('d')
  45. frombytes(obj, data)
  46. return obj
  47. obj = [42, b'hello', array.array('d', [1.1, 2.2, 3.3])]
  48. s = msgpack.packb(obj, default=default)
  49. obj2 = msgpack.unpackb(s, ext_hook=ext_hook)
  50. assert obj == obj2