123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541 |
- """ test parquet compat """
- import datetime
- from distutils.version import LooseVersion
- import os
- from warnings import catch_warnings
- import numpy as np
- import pytest
- from pandas.compat import PY3
- import pandas.util._test_decorators as td
- import pandas as pd
- from pandas.util import testing as tm
- from pandas.io.parquet import (
- FastParquetImpl, PyArrowImpl, get_engine, read_parquet, to_parquet)
- try:
- import pyarrow # noqa
- _HAVE_PYARROW = True
- except ImportError:
- _HAVE_PYARROW = False
- try:
- import fastparquet # noqa
- _HAVE_FASTPARQUET = True
- except ImportError:
- _HAVE_FASTPARQUET = False
- # setup engines & skips
- @pytest.fixture(params=[
- pytest.param('fastparquet',
- marks=pytest.mark.skipif(not _HAVE_FASTPARQUET,
- reason='fastparquet is '
- 'not installed')),
- pytest.param('pyarrow',
- marks=pytest.mark.skipif(not _HAVE_PYARROW,
- reason='pyarrow is '
- 'not installed'))])
- def engine(request):
- return request.param
- @pytest.fixture
- def pa():
- if not _HAVE_PYARROW:
- pytest.skip("pyarrow is not installed")
- return 'pyarrow'
- @pytest.fixture
- def fp():
- if not _HAVE_FASTPARQUET:
- pytest.skip("fastparquet is not installed")
- return 'fastparquet'
- @pytest.fixture
- def df_compat():
- return pd.DataFrame({'A': [1, 2, 3], 'B': 'foo'})
- @pytest.fixture
- def df_cross_compat():
- df = pd.DataFrame({'a': list('abc'),
- 'b': list(range(1, 4)),
- # 'c': np.arange(3, 6).astype('u1'),
- 'd': np.arange(4.0, 7.0, dtype='float64'),
- 'e': [True, False, True],
- 'f': pd.date_range('20130101', periods=3),
- # 'g': pd.date_range('20130101', periods=3,
- # tz='US/Eastern'),
- # 'h': pd.date_range('20130101', periods=3, freq='ns')
- })
- return df
- @pytest.fixture
- def df_full():
- return pd.DataFrame(
- {'string': list('abc'),
- 'string_with_nan': ['a', np.nan, 'c'],
- 'string_with_none': ['a', None, 'c'],
- 'bytes': [b'foo', b'bar', b'baz'],
- 'unicode': [u'foo', u'bar', u'baz'],
- 'int': list(range(1, 4)),
- 'uint': np.arange(3, 6).astype('u1'),
- 'float': np.arange(4.0, 7.0, dtype='float64'),
- 'float_with_nan': [2., np.nan, 3.],
- 'bool': [True, False, True],
- 'datetime': pd.date_range('20130101', periods=3),
- 'datetime_with_nat': [pd.Timestamp('20130101'),
- pd.NaT,
- pd.Timestamp('20130103')]})
- def check_round_trip(df, engine=None, path=None,
- write_kwargs=None, read_kwargs=None,
- expected=None, check_names=True,
- repeat=2):
- """Verify parquet serializer and deserializer produce the same results.
- Performs a pandas to disk and disk to pandas round trip,
- then compares the 2 resulting DataFrames to verify equality.
- Parameters
- ----------
- df: Dataframe
- engine: str, optional
- 'pyarrow' or 'fastparquet'
- path: str, optional
- write_kwargs: dict of str:str, optional
- read_kwargs: dict of str:str, optional
- expected: DataFrame, optional
- Expected deserialization result, otherwise will be equal to `df`
- check_names: list of str, optional
- Closed set of column names to be compared
- repeat: int, optional
- How many times to repeat the test
- """
- write_kwargs = write_kwargs or {'compression': None}
- read_kwargs = read_kwargs or {}
- if expected is None:
- expected = df
- if engine:
- write_kwargs['engine'] = engine
- read_kwargs['engine'] = engine
- def compare(repeat):
- for _ in range(repeat):
- df.to_parquet(path, **write_kwargs)
- with catch_warnings(record=True):
- actual = read_parquet(path, **read_kwargs)
- tm.assert_frame_equal(expected, actual,
- check_names=check_names)
- if path is None:
- with tm.ensure_clean() as path:
- compare(repeat)
- else:
- compare(repeat)
- def test_invalid_engine(df_compat):
- with pytest.raises(ValueError):
- check_round_trip(df_compat, 'foo', 'bar')
- def test_options_py(df_compat, pa):
- # use the set option
- with pd.option_context('io.parquet.engine', 'pyarrow'):
- check_round_trip(df_compat)
- def test_options_fp(df_compat, fp):
- # use the set option
- with pd.option_context('io.parquet.engine', 'fastparquet'):
- check_round_trip(df_compat)
- def test_options_auto(df_compat, fp, pa):
- # use the set option
- with pd.option_context('io.parquet.engine', 'auto'):
- check_round_trip(df_compat)
- def test_options_get_engine(fp, pa):
- assert isinstance(get_engine('pyarrow'), PyArrowImpl)
- assert isinstance(get_engine('fastparquet'), FastParquetImpl)
- with pd.option_context('io.parquet.engine', 'pyarrow'):
- assert isinstance(get_engine('auto'), PyArrowImpl)
- assert isinstance(get_engine('pyarrow'), PyArrowImpl)
- assert isinstance(get_engine('fastparquet'), FastParquetImpl)
- with pd.option_context('io.parquet.engine', 'fastparquet'):
- assert isinstance(get_engine('auto'), FastParquetImpl)
- assert isinstance(get_engine('pyarrow'), PyArrowImpl)
- assert isinstance(get_engine('fastparquet'), FastParquetImpl)
- with pd.option_context('io.parquet.engine', 'auto'):
- assert isinstance(get_engine('auto'), PyArrowImpl)
- assert isinstance(get_engine('pyarrow'), PyArrowImpl)
- assert isinstance(get_engine('fastparquet'), FastParquetImpl)
- def test_cross_engine_pa_fp(df_cross_compat, pa, fp):
- # cross-compat with differing reading/writing engines
- df = df_cross_compat
- with tm.ensure_clean() as path:
- df.to_parquet(path, engine=pa, compression=None)
- result = read_parquet(path, engine=fp)
- tm.assert_frame_equal(result, df)
- result = read_parquet(path, engine=fp, columns=['a', 'd'])
- tm.assert_frame_equal(result, df[['a', 'd']])
- def test_cross_engine_fp_pa(df_cross_compat, pa, fp):
- # cross-compat with differing reading/writing engines
- df = df_cross_compat
- with tm.ensure_clean() as path:
- df.to_parquet(path, engine=fp, compression=None)
- with catch_warnings(record=True):
- result = read_parquet(path, engine=pa)
- tm.assert_frame_equal(result, df)
- result = read_parquet(path, engine=pa, columns=['a', 'd'])
- tm.assert_frame_equal(result, df[['a', 'd']])
- class Base(object):
- def check_error_on_write(self, df, engine, exc):
- # check that we are raising the exception on writing
- with tm.ensure_clean() as path:
- with pytest.raises(exc):
- to_parquet(df, path, engine, compression=None)
- class TestBasic(Base):
- def test_error(self, engine):
- for obj in [pd.Series([1, 2, 3]), 1, 'foo', pd.Timestamp('20130101'),
- np.array([1, 2, 3])]:
- self.check_error_on_write(obj, engine, ValueError)
- def test_columns_dtypes(self, engine):
- df = pd.DataFrame({'string': list('abc'),
- 'int': list(range(1, 4))})
- # unicode
- df.columns = [u'foo', u'bar']
- check_round_trip(df, engine)
- def test_columns_dtypes_invalid(self, engine):
- df = pd.DataFrame({'string': list('abc'),
- 'int': list(range(1, 4))})
- # numeric
- df.columns = [0, 1]
- self.check_error_on_write(df, engine, ValueError)
- if PY3:
- # bytes on PY3, on PY2 these are str
- df.columns = [b'foo', b'bar']
- self.check_error_on_write(df, engine, ValueError)
- # python object
- df.columns = [datetime.datetime(2011, 1, 1, 0, 0),
- datetime.datetime(2011, 1, 1, 1, 1)]
- self.check_error_on_write(df, engine, ValueError)
- @pytest.mark.parametrize('compression', [None, 'gzip', 'snappy', 'brotli'])
- def test_compression(self, engine, compression):
- if compression == 'snappy':
- pytest.importorskip('snappy')
- elif compression == 'brotli':
- pytest.importorskip('brotli')
- df = pd.DataFrame({'A': [1, 2, 3]})
- check_round_trip(df, engine, write_kwargs={'compression': compression})
- def test_read_columns(self, engine):
- # GH18154
- df = pd.DataFrame({'string': list('abc'),
- 'int': list(range(1, 4))})
- expected = pd.DataFrame({'string': list('abc')})
- check_round_trip(df, engine, expected=expected,
- read_kwargs={'columns': ['string']})
- def test_write_index(self, engine):
- check_names = engine != 'fastparquet'
- df = pd.DataFrame({'A': [1, 2, 3]})
- check_round_trip(df, engine)
- indexes = [
- [2, 3, 4],
- pd.date_range('20130101', periods=3),
- list('abc'),
- [1, 3, 4],
- ]
- # non-default index
- for index in indexes:
- df.index = index
- check_round_trip(df, engine, check_names=check_names)
- # index with meta-data
- df.index = [0, 1, 2]
- df.index.name = 'foo'
- check_round_trip(df, engine)
- def test_write_multiindex(self, pa):
- # Not suppoprted in fastparquet as of 0.1.3 or older pyarrow version
- engine = pa
- df = pd.DataFrame({'A': [1, 2, 3]})
- index = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
- df.index = index
- check_round_trip(df, engine)
- def test_write_column_multiindex(self, engine):
- # column multi-index
- mi_columns = pd.MultiIndex.from_tuples([('a', 1), ('a', 2), ('b', 1)])
- df = pd.DataFrame(np.random.randn(4, 3), columns=mi_columns)
- self.check_error_on_write(df, engine, ValueError)
- def test_multiindex_with_columns(self, pa):
- engine = pa
- dates = pd.date_range('01-Jan-2018', '01-Dec-2018', freq='MS')
- df = pd.DataFrame(np.random.randn(2 * len(dates), 3),
- columns=list('ABC'))
- index1 = pd.MultiIndex.from_product(
- [['Level1', 'Level2'], dates],
- names=['level', 'date'])
- index2 = index1.copy(names=None)
- for index in [index1, index2]:
- df.index = index
- check_round_trip(df, engine)
- check_round_trip(df, engine, read_kwargs={'columns': ['A', 'B']},
- expected=df[['A', 'B']])
- def test_write_ignoring_index(self, engine):
- # ENH 20768
- # Ensure index=False omits the index from the written Parquet file.
- df = pd.DataFrame({'a': [1, 2, 3], 'b': ['q', 'r', 's']})
- write_kwargs = {
- 'compression': None,
- 'index': False,
- }
- # Because we're dropping the index, we expect the loaded dataframe to
- # have the default integer index.
- expected = df.reset_index(drop=True)
- check_round_trip(df, engine, write_kwargs=write_kwargs,
- expected=expected)
- # Ignore custom index
- df = pd.DataFrame({'a': [1, 2, 3], 'b': ['q', 'r', 's']},
- index=['zyx', 'wvu', 'tsr'])
- check_round_trip(df, engine, write_kwargs=write_kwargs,
- expected=expected)
- # Ignore multi-indexes as well.
- arrays = [['bar', 'bar', 'baz', 'baz', 'foo', 'foo', 'qux', 'qux'],
- ['one', 'two', 'one', 'two', 'one', 'two', 'one', 'two']]
- df = pd.DataFrame({'one': [i for i in range(8)],
- 'two': [-i for i in range(8)]}, index=arrays)
- expected = df.reset_index(drop=True)
- check_round_trip(df, engine, write_kwargs=write_kwargs,
- expected=expected)
- class TestParquetPyArrow(Base):
- def test_basic(self, pa, df_full):
- df = df_full
- # additional supported types for pyarrow
- df['datetime_tz'] = pd.date_range('20130101', periods=3,
- tz='Europe/Brussels')
- df['bool_with_none'] = [True, None, True]
- check_round_trip(df, pa)
- # TODO: This doesn't fail on all systems; track down which
- @pytest.mark.xfail(reason="pyarrow fails on this (ARROW-1883)",
- strict=False)
- def test_basic_subset_columns(self, pa, df_full):
- # GH18628
- df = df_full
- # additional supported types for pyarrow
- df['datetime_tz'] = pd.date_range('20130101', periods=3,
- tz='Europe/Brussels')
- check_round_trip(df, pa, expected=df[['string', 'int']],
- read_kwargs={'columns': ['string', 'int']})
- def test_duplicate_columns(self, pa):
- # not currently able to handle duplicate columns
- df = pd.DataFrame(np.arange(12).reshape(4, 3),
- columns=list('aaa')).copy()
- self.check_error_on_write(df, pa, ValueError)
- def test_unsupported(self, pa):
- # period
- df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
- # pyarrow 0.11 raises ArrowTypeError
- # older pyarrows raise ArrowInvalid
- self.check_error_on_write(df, pa, Exception)
- # timedelta
- df = pd.DataFrame({'a': pd.timedelta_range('1 day',
- periods=3)})
- self.check_error_on_write(df, pa, NotImplementedError)
- # mixed python objects
- df = pd.DataFrame({'a': ['a', 1, 2.0]})
- # pyarrow 0.11 raises ArrowTypeError
- # older pyarrows raise ArrowInvalid
- self.check_error_on_write(df, pa, Exception)
- def test_categorical(self, pa):
- # supported in >= 0.7.0
- df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
- # de-serialized as object
- expected = df.assign(a=df.a.astype(object))
- check_round_trip(df, pa, expected=expected)
- def test_s3_roundtrip(self, df_compat, s3_resource, pa):
- # GH #19134
- check_round_trip(df_compat, pa,
- path='s3://pandas-test/pyarrow.parquet')
- def test_partition_cols_supported(self, pa, df_full):
- # GH #23283
- partition_cols = ['bool', 'int']
- df = df_full
- with tm.ensure_clean_dir() as path:
- df.to_parquet(path, partition_cols=partition_cols,
- compression=None)
- import pyarrow.parquet as pq
- dataset = pq.ParquetDataset(path, validate_schema=False)
- assert len(dataset.partitions.partition_names) == 2
- assert dataset.partitions.partition_names == set(partition_cols)
- class TestParquetFastParquet(Base):
- @td.skip_if_no('fastparquet', min_version="0.2.1")
- def test_basic(self, fp, df_full):
- df = df_full
- # additional supported types for fastparquet
- if LooseVersion(fastparquet.__version__) >= LooseVersion('0.1.4'):
- df['datetime_tz'] = pd.date_range('20130101', periods=3,
- tz='US/Eastern')
- df['timedelta'] = pd.timedelta_range('1 day', periods=3)
- check_round_trip(df, fp)
- @pytest.mark.skip(reason="not supported")
- def test_duplicate_columns(self, fp):
- # not currently able to handle duplicate columns
- df = pd.DataFrame(np.arange(12).reshape(4, 3),
- columns=list('aaa')).copy()
- self.check_error_on_write(df, fp, ValueError)
- def test_bool_with_none(self, fp):
- df = pd.DataFrame({'a': [True, None, False]})
- expected = pd.DataFrame({'a': [1.0, np.nan, 0.0]}, dtype='float16')
- check_round_trip(df, fp, expected=expected)
- def test_unsupported(self, fp):
- # period
- df = pd.DataFrame({'a': pd.period_range('2013', freq='M', periods=3)})
- self.check_error_on_write(df, fp, ValueError)
- # mixed
- df = pd.DataFrame({'a': ['a', 1, 2.0]})
- self.check_error_on_write(df, fp, ValueError)
- def test_categorical(self, fp):
- if LooseVersion(fastparquet.__version__) < LooseVersion("0.1.3"):
- pytest.skip("CategoricalDtype not supported for older fp")
- df = pd.DataFrame({'a': pd.Categorical(list('abc'))})
- check_round_trip(df, fp)
- def test_filter_row_groups(self, fp):
- d = {'a': list(range(0, 3))}
- df = pd.DataFrame(d)
- with tm.ensure_clean() as path:
- df.to_parquet(path, fp, compression=None,
- row_group_offsets=1)
- result = read_parquet(path, fp, filters=[('a', '==', 0)])
- assert len(result) == 1
- def test_s3_roundtrip(self, df_compat, s3_resource, fp):
- # GH #19134
- check_round_trip(df_compat, fp,
- path='s3://pandas-test/fastparquet.parquet')
- def test_partition_cols_supported(self, fp, df_full):
- # GH #23283
- partition_cols = ['bool', 'int']
- df = df_full
- with tm.ensure_clean_dir() as path:
- df.to_parquet(path, engine="fastparquet",
- partition_cols=partition_cols, compression=None)
- assert os.path.exists(path)
- import fastparquet
- actual_partition_cols = fastparquet.ParquetFile(path, False).cats
- assert len(actual_partition_cols) == 2
- def test_partition_on_supported(self, fp, df_full):
- # GH #23283
- partition_cols = ['bool', 'int']
- df = df_full
- with tm.ensure_clean_dir() as path:
- df.to_parquet(path, engine="fastparquet", compression=None,
- partition_on=partition_cols)
- assert os.path.exists(path)
- import fastparquet
- actual_partition_cols = fastparquet.ParquetFile(path, False).cats
- assert len(actual_partition_cols) == 2
- def test_error_on_using_partition_cols_and_partition_on(self, fp, df_full):
- # GH #23283
- partition_cols = ['bool', 'int']
- df = df_full
- with pytest.raises(ValueError):
- with tm.ensure_clean_dir() as path:
- df.to_parquet(path, engine="fastparquet", compression=None,
- partition_on=partition_cols,
- partition_cols=partition_cols)
|