test_pack.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. # coding: utf-8
  2. import struct
  3. import pytest
  4. from pandas.compat import OrderedDict, u
  5. from pandas import compat
  6. from pandas.io.msgpack import Packer, Unpacker, packb, unpackb
  7. class TestPack(object):
  8. def check(self, data, use_list=False):
  9. re = unpackb(packb(data), use_list=use_list)
  10. assert re == data
  11. def testPack(self):
  12. test_data = [
  13. 0, 1, 127, 128, 255, 256, 65535, 65536,
  14. -1, -32, -33, -128, -129, -32768, -32769,
  15. 1.0,
  16. b"", b"a", b"a" * 31, b"a" * 32,
  17. None, True, False,
  18. (), ((),), ((), None,),
  19. {None: 0},
  20. (1 << 23),
  21. ]
  22. for td in test_data:
  23. self.check(td)
  24. def testPackUnicode(self):
  25. test_data = [u(""), u("abcd"), [u("defgh")], u("Русский текст"), ]
  26. for td in test_data:
  27. re = unpackb(
  28. packb(td, encoding='utf-8'), use_list=1, encoding='utf-8')
  29. assert re == td
  30. packer = Packer(encoding='utf-8')
  31. data = packer.pack(td)
  32. re = Unpacker(
  33. compat.BytesIO(data), encoding='utf-8', use_list=1).unpack()
  34. assert re == td
  35. def testPackUTF32(self):
  36. test_data = [
  37. compat.u(""),
  38. compat.u("abcd"),
  39. [compat.u("defgh")],
  40. compat.u("Русский текст"),
  41. ]
  42. for td in test_data:
  43. re = unpackb(
  44. packb(td, encoding='utf-32'), use_list=1, encoding='utf-32')
  45. assert re == td
  46. def testPackBytes(self):
  47. test_data = [b"", b"abcd", (b"defgh", ), ]
  48. for td in test_data:
  49. self.check(td)
  50. def testIgnoreUnicodeErrors(self):
  51. re = unpackb(
  52. packb(b'abc\xeddef'), encoding='utf-8', unicode_errors='ignore',
  53. use_list=1)
  54. assert re == "abcdef"
  55. def testStrictUnicodeUnpack(self):
  56. msg = (r"'utf-*8' codec can't decode byte 0xed in position 3:"
  57. " invalid continuation byte")
  58. with pytest.raises(UnicodeDecodeError, match=msg):
  59. unpackb(packb(b'abc\xeddef'), encoding='utf-8', use_list=1)
  60. def testStrictUnicodePack(self):
  61. msg = (r"'ascii' codec can't encode character u*'\\xed' in position 3:"
  62. r" ordinal not in range\(128\)")
  63. with pytest.raises(UnicodeEncodeError, match=msg):
  64. packb(compat.u("abc\xeddef"), encoding='ascii',
  65. unicode_errors='strict')
  66. def testIgnoreErrorsPack(self):
  67. re = unpackb(
  68. packb(
  69. compat.u("abcФФФdef"), encoding='ascii',
  70. unicode_errors='ignore'), encoding='utf-8', use_list=1)
  71. assert re == compat.u("abcdef")
  72. def testNoEncoding(self):
  73. msg = "Can't encode unicode string: no encoding is specified"
  74. with pytest.raises(TypeError, match=msg):
  75. packb(compat.u("abc"), encoding=None)
  76. def testDecodeBinary(self):
  77. re = unpackb(packb("abc"), encoding=None, use_list=1)
  78. assert re == b"abc"
  79. def testPackFloat(self):
  80. assert packb(1.0,
  81. use_single_float=True) == b'\xca' + struct.pack('>f', 1.0)
  82. assert packb(
  83. 1.0, use_single_float=False) == b'\xcb' + struct.pack('>d', 1.0)
  84. def testArraySize(self, sizes=[0, 5, 50, 1000]):
  85. bio = compat.BytesIO()
  86. packer = Packer()
  87. for size in sizes:
  88. bio.write(packer.pack_array_header(size))
  89. for i in range(size):
  90. bio.write(packer.pack(i))
  91. bio.seek(0)
  92. unpacker = Unpacker(bio, use_list=1)
  93. for size in sizes:
  94. assert unpacker.unpack() == list(range(size))
  95. def test_manualreset(self, sizes=[0, 5, 50, 1000]):
  96. packer = Packer(autoreset=False)
  97. for size in sizes:
  98. packer.pack_array_header(size)
  99. for i in range(size):
  100. packer.pack(i)
  101. bio = compat.BytesIO(packer.bytes())
  102. unpacker = Unpacker(bio, use_list=1)
  103. for size in sizes:
  104. assert unpacker.unpack() == list(range(size))
  105. packer.reset()
  106. assert packer.bytes() == b''
  107. def testMapSize(self, sizes=[0, 5, 50, 1000]):
  108. bio = compat.BytesIO()
  109. packer = Packer()
  110. for size in sizes:
  111. bio.write(packer.pack_map_header(size))
  112. for i in range(size):
  113. bio.write(packer.pack(i)) # key
  114. bio.write(packer.pack(i * 2)) # value
  115. bio.seek(0)
  116. unpacker = Unpacker(bio)
  117. for size in sizes:
  118. assert unpacker.unpack() == {i: i * 2 for i in range(size)}
  119. def test_odict(self):
  120. seq = [(b'one', 1), (b'two', 2), (b'three', 3), (b'four', 4)]
  121. od = OrderedDict(seq)
  122. assert unpackb(packb(od), use_list=1) == dict(seq)
  123. def pair_hook(seq):
  124. return list(seq)
  125. assert unpackb(
  126. packb(od), object_pairs_hook=pair_hook, use_list=1) == seq
  127. def test_pairlist(self):
  128. pairlist = [(b'a', 1), (2, b'b'), (b'foo', b'bar')]
  129. packer = Packer()
  130. packed = packer.pack_map_pairs(pairlist)
  131. unpacked = unpackb(packed, object_pairs_hook=list)
  132. assert pairlist == unpacked