test_parquet.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541
  1. """ test parquet compat """
  2. import datetime
  3. from distutils.version import LooseVersion
  4. import os
  5. from warnings import catch_warnings
  6. import numpy as np
  7. import pytest
  8. from pandas.compat import PY3
  9. import pandas.util._test_decorators as td
  10. import pandas as pd
  11. from pandas.util import testing as tm
  12. from pandas.io.parquet import (
  13. FastParquetImpl, PyArrowImpl, get_engine, read_parquet, to_parquet)
  14. try:
  15. import pyarrow # noqa
  16. _HAVE_PYARROW = True
  17. except ImportError:
  18. _HAVE_PYARROW = False
  19. try:
  20. import fastparquet # noqa
  21. _HAVE_FASTPARQUET = True
  22. except ImportError:
  23. _HAVE_FASTPARQUET = False
  24. # setup engines & skips
  25. @pytest.fixture(params=[
  26. pytest.param('fastparquet',
  27. marks=pytest.mark.skipif(not _HAVE_FASTPARQUET,
  28. reason='fastparquet is '
  29. 'not installed')),
  30. pytest.param('pyarrow',
  31. marks=pytest.mark.skipif(not _HAVE_PYARROW,
  32. reason='pyarrow is '
  33. 'not installed'))])
  34. def engine(request):
  35. return request.param
  36. @pytest.fixture
  37. def pa():
  38. if not _HAVE_PYARROW:
  39. pytest.skip("pyarrow is not installed")
  40. return 'pyarrow'
  41. @pytest.fixture
  42. def fp():
  43. if not _HAVE_FASTPARQUET:
  44. pytest.skip("fastparquet is not installed")
  45. return 'fastparquet'
  46. @pytest.fixture
  47. def df_compat():
  48. return pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'})
  49. @pytest.fixture
  50. def df_cross_compat():
  51. df = pd.DataFrame({'a': list('abc'),
  52. 'b': list(range(1, 4)),
  53. # 'c': np.arange(3, 6).astype('u1'),
  54. 'd': np.arange(4.0, 7.0, dtype='float64'),
  55. 'e': [True, False, True],
  56. 'f': pd.date_range('20130101', periods=3),
  57. # 'g': pd.date_range('20130101', periods=3,
  58. # tz='US/Eastern'),
  59. # 'h': pd.date_range('20130101', periods=3, freq='ns')
  60. })
  61. return df
  62. @pytest.fixture
  63. def df_full():
  64. return pd.DataFrame(
  65. {'string': list('abc'),
  66. 'string_with_nan': ['a', np.nan, 'c'],
  67. 'string_with_none': ['a', None, 'c'],
  68. 'bytes': [b'foo', b'bar', b'baz'],
  69. 'unicode': [u'foo', u'bar', u'baz'],
  70. 'int': list(range(1, 4)),
  71. 'uint': np.arange(3, 6).astype('u1'),
  72. 'float': np.arange(4.0, 7.0, dtype='float64'),
  73. 'float_with_nan': [2., np.nan, 3.],
  74. 'bool': [True, False, True],
  75. 'datetime': pd.date_range('20130101', periods=3),
  76. 'datetime_with_nat': [pd.Timestamp('20130101'),
  77. pd.NaT,
  78. pd.Timestamp('20130103')]})
  79. def check_round_trip(df, engine=None, path=None,
  80. write_kwargs=None, read_kwargs=None,
  81. expected=None, check_names=True,
  82. repeat=2):
  83. """Verify parquet serializer and deserializer produce the same results.
  84. Performs a pandas to disk and disk to pandas round trip,
  85. then compares the 2 resulting DataFrames to verify equality.
  86. Parameters
  87. ----------
  88. df: Dataframe
  89. engine: str, optional
  90. 'pyarrow' or 'fastparquet'
  91. path: str, optional
  92. write_kwargs: dict of str:str, optional
  93. read_kwargs: dict of str:str, optional
  94. expected: DataFrame, optional
  95. Expected deserialization result, otherwise will be equal to `df`
  96. check_names: list of str, optional
  97. Closed set of column names to be compared
  98. repeat: int, optional
  99. How many times to repeat the test
  100. """
  101. write_kwargs = write_kwargs or {'compression': None}
  102. read_kwargs = read_kwargs or {}
  103. if expected is None:
  104. expected = df
  105. if engine:
  106. write_kwargs['engine'] = engine
  107. read_kwargs['engine'] = engine
  108. def compare(repeat):
  109. for _ in range(repeat):
  110. df.to_parquet(path, **write_kwargs)
  111. with catch_warnings(record=True):
  112. actual = read_parquet(path, **read_kwargs)
  113. tm.assert_frame_equal(expected, actual,
  114. check_names=check_names)
  115. if path is None:
  116. with tm.ensure_clean() as path:
  117. compare(repeat)
  118. else:
  119. compare(repeat)
  120. def test_invalid_engine(df_compat):
  121. with pytest.raises(ValueError):
  122. check_round_trip(df_compat, 'foo', 'bar')
  123. def test_options_py(df_compat, pa):
  124. # use the set option
  125. with pd.option_context('io.parquet.engine', 'pyarrow'):
  126. check_round_trip(df_compat)
  127. def test_options_fp(df_compat, fp):
  128. # use the set option
  129. with pd.option_context('io.parquet.engine', 'fastparquet'):
  130. check_round_trip(df_compat)
  131. def test_options_auto(df_compat, fp, pa):
  132. # use the set option
  133. with pd.option_context('io.parquet.engine', 'auto'):
  134. check_round_trip(df_compat)
  135. def test_options_get_engine(fp, pa):
  136. assert isinstance(get_engine('pyarrow'), PyArrowImpl)
  137. assert isinstance(get_engine('fastparquet'), FastParquetImpl)
  138. with pd.option_context('io.parquet.engine', 'pyarrow'):
  139. assert isinstance(get_engine('auto'), PyArrowImpl)
  140. assert isinstance(get_engine('pyarrow'), PyArrowImpl)
  141. assert isinstance(get_engine('fastparquet'), FastParquetImpl)
  142. with pd.option_context('io.parquet.engine', 'fastparquet'):
  143. assert isinstance(get_engine('auto'), FastParquetImpl)
  144. assert isinstance(get_engine('pyarrow'), PyArrowImpl)
  145. assert isinstance(get_engine('fastparquet'), FastParquetImpl)
  146. with pd.option_context('io.parquet.engine', 'auto'):
  147. assert isinstance(get_engine('auto'), PyArrowImpl)
  148. assert isinstance(get_engine('pyarrow'), PyArrowImpl)
  149. assert isinstance(get_engine('fastparquet'), FastParquetImpl)
  150. def test_cross_engine_pa_fp(df_cross_compat, pa, fp):
  151. # cross-compat with differing reading/writing engines
  152. df = df_cross_compat
  153. with tm.ensure_clean() as path:
  154. df.to_parquet(path, engine=pa, compression=None)
  155. result = read_parquet(path, engine=fp)
  156. tm.assert_frame_equal(result, df)
  157. result = read_parquet(path, engine=fp, columns=['a', 'd'])
  158. tm.assert_frame_equal(result, df[['a', 'd']])
  159. def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
  160. # cross-compat with differing reading/writing engines
  161. df = df_cross_compat
  162. with tm.ensure_clean() as path:
  163. df.to_parquet(path, engine=fp, compression=None)
  164. with catch_warnings(record=True):
  165. result = read_parquet(path, engine=pa)
  166. tm.assert_frame_equal(result, df)
  167. result = read_parquet(path, engine=pa, columns=['a', 'd'])
  168. tm.assert_frame_equal(result, df[['a', 'd']])
  169. class Base(object):
  170. def check_error_on_write(self, df, engine, exc):
  171. # check that we are raising the exception on writing
  172. with tm.ensure_clean() as path:
  173. with pytest.raises(exc):
  174. to_parquet(df, path, engine, compression=None)
  175. class TestBasic(Base):
  176. def test_error(self, engine):
  177. for obj in [pd.Series([1, 2, 3]), 1, 'foo', pd.Timestamp('20130101'),
  178. np.array([1, 2, 3])]:
  179. self.check_error_on_write(obj, engine, ValueError)
  180. def test_columns_dtypes(self, engine):
  181. df = pd.DataFrame({'string': list('abc'),
  182. 'int': list(range(1, 4))})
  183. # unicode
  184. df.columns = [u'foo', u'bar']
  185. check_round_trip(df, engine)
  186. def test_columns_dtypes_invalid(self, engine):
  187. df = pd.DataFrame({'string': list('abc'),
  188. 'int': list(range(1, 4))})
  189. # numeric
  190. df.columns = [0, 1]
  191. self.check_error_on_write(df, engine, ValueError)
  192. if PY3:
  193. # bytes on PY3, on PY2 these are str
  194. df.columns = [b'foo', b'bar']
  195. self.check_error_on_write(df, engine, ValueError)
  196. # python object
  197. df.columns = [datetime.datetime(2011, 1, 1, 0, 0),
  198. datetime.datetime(2011, 1, 1, 1, 1)]
  199. self.check_error_on_write(df, engine, ValueError)
  200. @pytest.mark.parametrize('compression', [None, 'gzip', 'snappy', 'brotli'])
  201. def test_compression(self, engine, compression):
  202. if compression == 'snappy':
  203. pytest.importorskip('snappy')
  204. elif compression == 'brotli':
  205. pytest.importorskip('brotli')
  206. df = pd.DataFrame({'A': [1, 2, 3]})
  207. check_round_trip(df, engine, write_kwargs={'compression': compression})
  208. def test_read_columns(self, engine):
  209. # GH18154
  210. df = pd.DataFrame({'string': list('abc'),
  211. 'int': list(range(1, 4))})
  212. expected = pd.DataFrame({'string': list('abc')})
  213. check_round_trip(df, engine, expected=expected,
  214. read_kwargs={'columns': ['string']})
  215. def test_write_index(self, engine):
  216. check_names = engine != 'fastparquet'
  217. df = pd.DataFrame({'A': [1, 2, 3]})
  218. check_round_trip(df, engine)
  219. indexes = [
  220. [2, 3, 4],
  221. pd.date_range('20130101', periods=3),
  222. list('abc'),
  223. [1, 3, 4],
  224. ]
  225. # non-default index
  226. for index in indexes:
  227. df.index = index
  228. check_round_trip(df, engine, check_names=check_names)
  229. # index with meta-data
  230. df.index = [0, 1, 2]
  231. df.index.name = 'foo'
  232. check_round_trip(df, engine)
  233. def test_write_multiindex(self, pa):
  234. # Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version
  235. engine = pa
  236. df = pd.DataFrame({'A': [1, 2, 3]})
  237. index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
  238. df.index = index
  239. check_round_trip(df, engine)
  240. def test_write_column_multiindex(self, engine):
  241. # column multi-index
  242. mi_columns = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
  243. df = pd.DataFrame(np.random.randn(4, 3), columns=mi_columns)
  244. self.check_error_on_write(df, engine, ValueError)
  245. def test_multiindex_with_columns(self, pa):
  246. engine = pa
  247. dates = pd.date_range('01-Jan-2018', '01-Dec-2018', freq='MS')
  248. df = pd.DataFrame(np.random.randn(2 * len(dates), 3),
  249. columns=list('ABC'))
  250. index1 = pd.MultiIndex.from_product(
  251. [['Level1', 'Level2'], dates],
  252. names=['level', 'date'])
  253. index2 = index1.copy(names=None)
  254. for index in [index1, index2]:
  255. df.index = index
  256. check_round_trip(df, engine)
  257. check_round_trip(df, engine, read_kwargs={'columns': ['A', 'B']},
  258. expected=df[['A', 'B']])
  259. def test_write_ignoring_index(self, engine):
  260. # ENH 20768
  261. # Ensure index=False omits the index from the written Parquet file.
  262. df = pd.DataFrame({'a': [1, 2, 3], 'b': ['q', 'r', 's']})
  263. write_kwargs = {
  264. 'compression': None,
  265. 'index': False,
  266. }
  267. # Because we're dropping the index, we expect the loaded dataframe to
  268. # have the default integer index.
  269. expected = df.reset_index(drop=True)
  270. check_round_trip(df, engine, write_kwargs=write_kwargs,
  271. expected=expected)
  272. # Ignore custom index
  273. df = pd.DataFrame({'a': [1, 2, 3], 'b': ['q', 'r', 's']},
  274. index=['zyx', 'wvu', 'tsr'])
  275. check_round_trip(df, engine, write_kwargs=write_kwargs,
  276. expected=expected)
  277. # Ignore multi-indexes as well.
  278. arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
  279. ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
  280. df = pd.DataFrame({'one': [i for i in range(8)],
  281. 'two': [-i for i in range(8)]}, index=arrays)
  282. expected = df.reset_index(drop=True)
  283. check_round_trip(df, engine, write_kwargs=write_kwargs,
  284. expected=expected)
  285. class TestParquetPyArrow(Base):
  286. def test_basic(self, pa, df_full):
  287. df = df_full
  288. # additional supported types for pyarrow
  289. df['datetime_tz'] = pd.date_range('20130101', periods=3,
  290. tz='Europe/Brussels')
  291. df['bool_with_none'] = [True, None, True]
  292. check_round_trip(df, pa)
  293. # TODO: This doesn't fail on all systems; track down which
  294. @pytest.mark.xfail(reason="pyarrow fails on this (ARROW-1883)",
  295. strict=False)
  296. def test_basic_subset_columns(self, pa, df_full):
  297. # GH18628
  298. df = df_full
  299. # additional supported types for pyarrow
  300. df['datetime_tz'] = pd.date_range('20130101', periods=3,
  301. tz='Europe/Brussels')
  302. check_round_trip(df, pa, expected=df[['string', 'int']],
  303. read_kwargs={'columns': ['string', 'int']})
  304. def test_duplicate_columns(self, pa):
  305. # not currently able to handle duplicate columns
  306. df = pd.DataFrame(np.arange(12).reshape(4, 3),
  307. columns=list('aaa')).copy()
  308. self.check_error_on_write(df, pa, ValueError)
  309. def test_unsupported(self, pa):
  310. # period
  311. df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
  312. # pyarrow 0.11 raises ArrowTypeError
  313. # older pyarrows raise ArrowInvalid
  314. self.check_error_on_write(df, pa, Exception)
  315. # timedelta
  316. df = pd.DataFrame({'a': pd.timedelta_range('1 day',
  317. periods=3)})
  318. self.check_error_on_write(df, pa, NotImplementedError)
  319. # mixed python objects
  320. df = pd.DataFrame({'a': ['a', 1, 2.0]})
  321. # pyarrow 0.11 raises ArrowTypeError
  322. # older pyarrows raise ArrowInvalid
  323. self.check_error_on_write(df, pa, Exception)
  324. def test_categorical(self, pa):
  325. # supported in >= 0.7.0
  326. df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
  327. # de-serialized as object
  328. expected = df.assign(a=df.a.astype(object))
  329. check_round_trip(df, pa, expected=expected)
  330. def test_s3_roundtrip(self, df_compat, s3_resource, pa):
  331. # GH #19134
  332. check_round_trip(df_compat, pa,
  333. path='s3://pandas-test/pyarrow.parquet')
  334. def test_partition_cols_supported(self, pa, df_full):
  335. # GH #23283
  336. partition_cols = ['bool', 'int']
  337. df = df_full
  338. with tm.ensure_clean_dir() as path:
  339. df.to_parquet(path, partition_cols=partition_cols,
  340. compression=None)
  341. import pyarrow.parquet as pq
  342. dataset = pq.ParquetDataset(path, validate_schema=False)
  343. assert len(dataset.partitions.partition_names) == 2
  344. assert dataset.partitions.partition_names == set(partition_cols)
  345. class TestParquetFastParquet(Base):
  346. @td.skip_if_no('fastparquet', min_version="0.2.1")
  347. def test_basic(self, fp, df_full):
  348. df = df_full
  349. # additional supported types for fastparquet
  350. if LooseVersion(fastparquet.__version__) >= LooseVersion('0.1.4'):
  351. df['datetime_tz'] = pd.date_range('20130101', periods=3,
  352. tz='US/Eastern')
  353. df['timedelta'] = pd.timedelta_range('1 day', periods=3)
  354. check_round_trip(df, fp)
  355. @pytest.mark.skip(reason="not supported")
  356. def test_duplicate_columns(self, fp):
  357. # not currently able to handle duplicate columns
  358. df = pd.DataFrame(np.arange(12).reshape(4, 3),
  359. columns=list('aaa')).copy()
  360. self.check_error_on_write(df, fp, ValueError)
  361. def test_bool_with_none(self, fp):
  362. df = pd.DataFrame({'a': [True, None, False]})
  363. expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16')
  364. check_round_trip(df, fp, expected=expected)
  365. def test_unsupported(self, fp):
  366. # period
  367. df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
  368. self.check_error_on_write(df, fp, ValueError)
  369. # mixed
  370. df = pd.DataFrame({'a': ['a', 1, 2.0]})
  371. self.check_error_on_write(df, fp, ValueError)
  372. def test_categorical(self, fp):
  373. if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"):
  374. pytest.skip("CategoricalDtype not supported for older fp")
  375. df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
  376. check_round_trip(df, fp)
  377. def test_filter_row_groups(self, fp):
  378. d = {'a': list(range(0, 3))}
  379. df = pd.DataFrame(d)
  380. with tm.ensure_clean() as path:
  381. df.to_parquet(path, fp, compression=None,
  382. row_group_offsets=1)
  383. result = read_parquet(path, fp, filters=[('a', '==', 0)])
  384. assert len(result) == 1
  385. def test_s3_roundtrip(self, df_compat, s3_resource, fp):
  386. # GH #19134
  387. check_round_trip(df_compat, fp,
  388. path='s3://pandas-test/fastparquet.parquet')
  389. def test_partition_cols_supported(self, fp, df_full):
  390. # GH #23283
  391. partition_cols = ['bool', 'int']
  392. df = df_full
  393. with tm.ensure_clean_dir() as path:
  394. df.to_parquet(path, engine="fastparquet",
  395. partition_cols=partition_cols, compression=None)
  396. assert os.path.exists(path)
  397. import fastparquet
  398. actual_partition_cols = fastparquet.ParquetFile(path, False).cats
  399. assert len(actual_partition_cols) == 2
  400. def test_partition_on_supported(self, fp, df_full):
  401. # GH #23283
  402. partition_cols = ['bool', 'int']
  403. df = df_full
  404. with tm.ensure_clean_dir() as path:
  405. df.to_parquet(path, engine="fastparquet", compression=None,
  406. partition_on=partition_cols)
  407. assert os.path.exists(path)
  408. import fastparquet
  409. actual_partition_cols = fastparquet.ParquetFile(path, False).cats
  410. assert len(actual_partition_cols) == 2
  411. def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full):
  412. # GH #23283
  413. partition_cols = ['bool', 'int']
  414. df = df_full
  415. with pytest.raises(ValueError):
  416. with tm.ensure_clean_dir() as path:
  417. df.to_parquet(path, engine="fastparquet", compression=None,
  418. partition_on=partition_cols,
  419. partition_cols=partition_cols)