123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227 |
- import io
- import os
- import numpy as np
- import pytest
- from pandas.compat import PY2
- from pandas.errors import EmptyDataError
- import pandas.util._test_decorators as td
- import pandas as pd
- import pandas.util.testing as tm
- # https://github.com/cython/cython/issues/1720
- @pytest.mark.filterwarnings("ignore:can't resolve package:ImportWarning")
- class TestSAS7BDAT(object):
- @pytest.fixture(autouse=True)
- def setup_method(self, datapath):
- self.dirpath = datapath("io", "sas", "data")
- self.data = []
- self.test_ix = [list(range(1, 16)), [16]]
- for j in 1, 2:
- fname = os.path.join(
- self.dirpath, "test_sas7bdat_{j}.csv".format(j=j))
- df = pd.read_csv(fname)
- epoch = pd.datetime(1960, 1, 1)
- t1 = pd.to_timedelta(df["Column4"], unit='d')
- df["Column4"] = epoch + t1
- t2 = pd.to_timedelta(df["Column12"], unit='d')
- df["Column12"] = epoch + t2
- for k in range(df.shape[1]):
- col = df.iloc[:, k]
- if col.dtype == np.int64:
- df.iloc[:, k] = df.iloc[:, k].astype(np.float64)
- elif col.dtype == np.dtype('O'):
- if PY2:
- f = lambda x: (x.decode('utf-8') if
- isinstance(x, str) else x)
- df.iloc[:, k] = df.iloc[:, k].apply(f)
- self.data.append(df)
- def test_from_file(self):
- for j in 0, 1:
- df0 = self.data[j]
- for k in self.test_ix[j]:
- fname = os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k))
- df = pd.read_sas(fname, encoding='utf-8')
- tm.assert_frame_equal(df, df0)
- def test_from_buffer(self):
- for j in 0, 1:
- df0 = self.data[j]
- for k in self.test_ix[j]:
- fname = os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k))
- with open(fname, 'rb') as f:
- byts = f.read()
- buf = io.BytesIO(byts)
- rdr = pd.read_sas(buf, format="sas7bdat",
- iterator=True, encoding='utf-8')
- df = rdr.read()
- tm.assert_frame_equal(df, df0, check_exact=False)
- rdr.close()
- def test_from_iterator(self):
- for j in 0, 1:
- df0 = self.data[j]
- for k in self.test_ix[j]:
- fname = os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k))
- rdr = pd.read_sas(fname, iterator=True, encoding='utf-8')
- df = rdr.read(2)
- tm.assert_frame_equal(df, df0.iloc[0:2, :])
- df = rdr.read(3)
- tm.assert_frame_equal(df, df0.iloc[2:5, :])
- rdr.close()
- @td.skip_if_no('pathlib')
- def test_path_pathlib(self):
- from pathlib import Path
- for j in 0, 1:
- df0 = self.data[j]
- for k in self.test_ix[j]:
- fname = Path(os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k)))
- df = pd.read_sas(fname, encoding='utf-8')
- tm.assert_frame_equal(df, df0)
- @td.skip_if_no('py.path')
- def test_path_localpath(self):
- from py.path import local as LocalPath
- for j in 0, 1:
- df0 = self.data[j]
- for k in self.test_ix[j]:
- fname = LocalPath(os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k)))
- df = pd.read_sas(fname, encoding='utf-8')
- tm.assert_frame_equal(df, df0)
- def test_iterator_loop(self):
- # github #13654
- for j in 0, 1:
- for k in self.test_ix[j]:
- for chunksize in 3, 5, 10, 11:
- fname = os.path.join(
- self.dirpath, "test{k}.sas7bdat".format(k=k))
- rdr = pd.read_sas(fname, chunksize=10, encoding='utf-8')
- y = 0
- for x in rdr:
- y += x.shape[0]
- assert y == rdr.row_count
- rdr.close()
- def test_iterator_read_too_much(self):
- # github #14734
- k = self.test_ix[0][0]
- fname = os.path.join(self.dirpath, "test{k}.sas7bdat".format(k=k))
- rdr = pd.read_sas(fname, format="sas7bdat",
- iterator=True, encoding='utf-8')
- d1 = rdr.read(rdr.row_count + 20)
- rdr.close()
- rdr = pd.read_sas(fname, iterator=True, encoding="utf-8")
- d2 = rdr.read(rdr.row_count + 20)
- tm.assert_frame_equal(d1, d2)
- rdr.close()
- def test_encoding_options(datapath):
- fname = datapath("io", "sas", "data", "test1.sas7bdat")
- df1 = pd.read_sas(fname)
- df2 = pd.read_sas(fname, encoding='utf-8')
- for col in df1.columns:
- try:
- df1[col] = df1[col].str.decode('utf-8')
- except AttributeError:
- pass
- tm.assert_frame_equal(df1, df2)
- from pandas.io.sas.sas7bdat import SAS7BDATReader
- rdr = SAS7BDATReader(fname, convert_header_text=False)
- df3 = rdr.read()
- rdr.close()
- for x, y in zip(df1.columns, df3.columns):
- assert(x == y.decode())
- def test_productsales(datapath):
- fname = datapath("io", "sas", "data", "productsales.sas7bdat")
- df = pd.read_sas(fname, encoding='utf-8')
- fname = datapath("io", "sas", "data", "productsales.csv")
- df0 = pd.read_csv(fname, parse_dates=['MONTH'])
- vn = ["ACTUAL", "PREDICT", "QUARTER", "YEAR"]
- df0[vn] = df0[vn].astype(np.float64)
- tm.assert_frame_equal(df, df0)
- def test_12659(datapath):
- fname = datapath("io", "sas", "data", "test_12659.sas7bdat")
- df = pd.read_sas(fname)
- fname = datapath("io", "sas", "data", "test_12659.csv")
- df0 = pd.read_csv(fname)
- df0 = df0.astype(np.float64)
- tm.assert_frame_equal(df, df0)
- def test_airline(datapath):
- fname = datapath("io", "sas", "data", "airline.sas7bdat")
- df = pd.read_sas(fname)
- fname = datapath("io", "sas", "data", "airline.csv")
- df0 = pd.read_csv(fname)
- df0 = df0.astype(np.float64)
- tm.assert_frame_equal(df, df0, check_exact=False)
- def test_date_time(datapath):
- # Support of different SAS date/datetime formats (PR #15871)
- fname = datapath("io", "sas", "data", "datetime.sas7bdat")
- df = pd.read_sas(fname)
- fname = datapath("io", "sas", "data", "datetime.csv")
- df0 = pd.read_csv(fname, parse_dates=['Date1', 'Date2', 'DateTime',
- 'DateTimeHi', 'Taiw'])
- # GH 19732: Timestamps imported from sas will incur floating point errors
- df.iloc[:, 3] = df.iloc[:, 3].dt.round('us')
- tm.assert_frame_equal(df, df0)
- def test_compact_numerical_values(datapath):
- # Regression test for #21616
- fname = datapath("io", "sas", "data", "cars.sas7bdat")
- df = pd.read_sas(fname, encoding='latin-1')
- # The two columns CYL and WGT in cars.sas7bdat have column
- # width < 8 and only contain integral values.
- # Test that pandas doesn't corrupt the numbers by adding
- # decimals.
- result = df['WGT']
- expected = df['WGT'].round()
- tm.assert_series_equal(result, expected, check_exact=True)
- result = df['CYL']
- expected = df['CYL'].round()
- tm.assert_series_equal(result, expected, check_exact=True)
- def test_many_columns(datapath):
- # Test for looking for column information in more places (PR #22628)
- fname = datapath("io", "sas", "data", "many_columns.sas7bdat")
- df = pd.read_sas(fname, encoding='latin-1')
- fname = datapath("io", "sas", "data", "many_columns.csv")
- df0 = pd.read_csv(fname, encoding='latin-1')
- tm.assert_frame_equal(df, df0)
- def test_inconsistent_number_of_rows(datapath):
- # Regression test for issue #16615. (PR #22628)
- fname = datapath("io", "sas", "data", "load_log.sas7bdat")
- df = pd.read_sas(fname, encoding='latin-1')
- assert len(df) == 2097
- def test_zero_variables(datapath):
- # Check if the SAS file has zero variables (PR #18184)
- fname = datapath("io", "sas", "data", "zero_variables.sas7bdat")
- with pytest.raises(EmptyDataError):
- pd.read_sas(fname)
|