test_unpack.py 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. from io import BytesIO
  2. import sys
  3. import pytest
  4. from pandas.io.msgpack import ExtType, OutOfData, Unpacker, packb
  5. class TestUnpack(object):
  6. def test_unpack_array_header_from_file(self):
  7. f = BytesIO(packb([1, 2, 3, 4]))
  8. unpacker = Unpacker(f)
  9. assert unpacker.read_array_header() == 4
  10. assert unpacker.unpack() == 1
  11. assert unpacker.unpack() == 2
  12. assert unpacker.unpack() == 3
  13. assert unpacker.unpack() == 4
  14. msg = "No more data to unpack"
  15. with pytest.raises(OutOfData, match=msg):
  16. unpacker.unpack()
  17. def test_unpacker_hook_refcnt(self):
  18. if not hasattr(sys, 'getrefcount'):
  19. pytest.skip('no sys.getrefcount()')
  20. result = []
  21. def hook(x):
  22. result.append(x)
  23. return x
  24. basecnt = sys.getrefcount(hook)
  25. up = Unpacker(object_hook=hook, list_hook=hook)
  26. assert sys.getrefcount(hook) >= basecnt + 2
  27. up.feed(packb([{}]))
  28. up.feed(packb([{}]))
  29. assert up.unpack() == [{}]
  30. assert up.unpack() == [{}]
  31. assert result == [{}, [{}], {}, [{}]]
  32. del up
  33. assert sys.getrefcount(hook) == basecnt
  34. def test_unpacker_ext_hook(self):
  35. class MyUnpacker(Unpacker):
  36. def __init__(self):
  37. super(MyUnpacker, self).__init__(ext_hook=self._hook,
  38. encoding='utf-8')
  39. def _hook(self, code, data):
  40. if code == 1:
  41. return int(data)
  42. else:
  43. return ExtType(code, data)
  44. unpacker = MyUnpacker()
  45. unpacker.feed(packb({'a': 1}, encoding='utf-8'))
  46. assert unpacker.unpack() == {'a': 1}
  47. unpacker.feed(packb({'a': ExtType(1, b'123')}, encoding='utf-8'))
  48. assert unpacker.unpack() == {'a': 123}
  49. unpacker.feed(packb({'a': ExtType(2, b'321')}, encoding='utf-8'))
  50. assert unpacker.unpack() == {'a': ExtType(2, b'321')}