test_sas7bdat.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. import io
  2. import os
  3. import numpy as np
  4. import pytest
  5. from pandas.compat import PY2
  6. from pandas.errors import EmptyDataError
  7. import pandas.util._test_decorators as td
  8. import pandas as pd
  9. import pandas.util.testing as tm
  10. # https://github.com/cython/cython/issues/1720
  11. @pytest.mark.filterwarnings("ignore:can't resolve package:ImportWarning")
  12. class TestSAS7BDAT(object):
  13. @pytest.fixture(autouse=True)
  14. def setup_method(self, datapath):
  15. self.dirpath = datapath("io", "sas", "data")
  16. self.data = []
  17. self.test_ix = [list(range(1, 16)), [16]]
  18. for j in 1, 2:
  19. fname = os.path.join(
  20. self.dirpath, "test_sas7bdat_{j}.csv".format(j=j))
  21. df = pd.read_csv(fname)
  22. epoch = pd.datetime(1960, 1, 1)
  23. t1 = pd.to_timedelta(df["Column4"], unit='d')
  24. df["Column4"] = epoch + t1
  25. t2 = pd.to_timedelta(df["Column12"], unit='d')
  26. df["Column12"] = epoch + t2
  27. for k in range(df.shape[1]):
  28. col = df.iloc[:, k]
  29. if col.dtype == np.int64:
  30. df.iloc[:, k] = df.iloc[:, k].astype(np.float64)
  31. elif col.dtype == np.dtype('O'):
  32. if PY2:
  33. f = lambda x: (x.decode('utf-8') if
  34. isinstance(x, str) else x)
  35. df.iloc[:, k] = df.iloc[:, k].apply(f)
  36. self.data.append(df)
  37. def test_from_file(self):
  38. for j in 0, 1:
  39. df0 = self.data[j]
  40. for k in self.test_ix[j]:
  41. fname = os.path.join(
  42. self.dirpath, "test{k}.sas7bdat".format(k=k))
  43. df = pd.read_sas(fname, encoding='utf-8')
  44. tm.assert_frame_equal(df, df0)
  45. def test_from_buffer(self):
  46. for j in 0, 1:
  47. df0 = self.data[j]
  48. for k in self.test_ix[j]:
  49. fname = os.path.join(
  50. self.dirpath, "test{k}.sas7bdat".format(k=k))
  51. with open(fname, 'rb') as f:
  52. byts = f.read()
  53. buf = io.BytesIO(byts)
  54. rdr = pd.read_sas(buf, format="sas7bdat",
  55. iterator=True, encoding='utf-8')
  56. df = rdr.read()
  57. tm.assert_frame_equal(df, df0, check_exact=False)
  58. rdr.close()
  59. def test_from_iterator(self):
  60. for j in 0, 1:
  61. df0 = self.data[j]
  62. for k in self.test_ix[j]:
  63. fname = os.path.join(
  64. self.dirpath, "test{k}.sas7bdat".format(k=k))
  65. rdr = pd.read_sas(fname, iterator=True, encoding='utf-8')
  66. df = rdr.read(2)
  67. tm.assert_frame_equal(df, df0.iloc[0:2, :])
  68. df = rdr.read(3)
  69. tm.assert_frame_equal(df, df0.iloc[2:5, :])
  70. rdr.close()
  71. @td.skip_if_no('pathlib')
  72. def test_path_pathlib(self):
  73. from pathlib import Path
  74. for j in 0, 1:
  75. df0 = self.data[j]
  76. for k in self.test_ix[j]:
  77. fname = Path(os.path.join(
  78. self.dirpath, "test{k}.sas7bdat".format(k=k)))
  79. df = pd.read_sas(fname, encoding='utf-8')
  80. tm.assert_frame_equal(df, df0)
  81. @td.skip_if_no('py.path')
  82. def test_path_localpath(self):
  83. from py.path import local as LocalPath
  84. for j in 0, 1:
  85. df0 = self.data[j]
  86. for k in self.test_ix[j]:
  87. fname = LocalPath(os.path.join(
  88. self.dirpath, "test{k}.sas7bdat".format(k=k)))
  89. df = pd.read_sas(fname, encoding='utf-8')
  90. tm.assert_frame_equal(df, df0)
  91. def test_iterator_loop(self):
  92. # github #13654
  93. for j in 0, 1:
  94. for k in self.test_ix[j]:
  95. for chunksize in 3, 5, 10, 11:
  96. fname = os.path.join(
  97. self.dirpath, "test{k}.sas7bdat".format(k=k))
  98. rdr = pd.read_sas(fname, chunksize=10, encoding='utf-8')
  99. y = 0
  100. for x in rdr:
  101. y += x.shape[0]
  102. assert y == rdr.row_count
  103. rdr.close()
  104. def test_iterator_read_too_much(self):
  105. # github #14734
  106. k = self.test_ix[0][0]
  107. fname = os.path.join(self.dirpath, "test{k}.sas7bdat".format(k=k))
  108. rdr = pd.read_sas(fname, format="sas7bdat",
  109. iterator=True, encoding='utf-8')
  110. d1 = rdr.read(rdr.row_count + 20)
  111. rdr.close()
  112. rdr = pd.read_sas(fname, iterator=True, encoding="utf-8")
  113. d2 = rdr.read(rdr.row_count + 20)
  114. tm.assert_frame_equal(d1, d2)
  115. rdr.close()
  116. def test_encoding_options(datapath):
  117. fname = datapath("io", "sas", "data", "test1.sas7bdat")
  118. df1 = pd.read_sas(fname)
  119. df2 = pd.read_sas(fname, encoding='utf-8')
  120. for col in df1.columns:
  121. try:
  122. df1[col] = df1[col].str.decode('utf-8')
  123. except AttributeError:
  124. pass
  125. tm.assert_frame_equal(df1, df2)
  126. from pandas.io.sas.sas7bdat import SAS7BDATReader
  127. rdr = SAS7BDATReader(fname, convert_header_text=False)
  128. df3 = rdr.read()
  129. rdr.close()
  130. for x, y in zip(df1.columns, df3.columns):
  131. assert(x == y.decode())
  132. def test_productsales(datapath):
  133. fname = datapath("io", "sas", "data", "productsales.sas7bdat")
  134. df = pd.read_sas(fname, encoding='utf-8')
  135. fname = datapath("io", "sas", "data", "productsales.csv")
  136. df0 = pd.read_csv(fname, parse_dates=['MONTH'])
  137. vn = ["ACTUAL", "PREDICT", "QUARTER", "YEAR"]
  138. df0[vn] = df0[vn].astype(np.float64)
  139. tm.assert_frame_equal(df, df0)
  140. def test_12659(datapath):
  141. fname = datapath("io", "sas", "data", "test_12659.sas7bdat")
  142. df = pd.read_sas(fname)
  143. fname = datapath("io", "sas", "data", "test_12659.csv")
  144. df0 = pd.read_csv(fname)
  145. df0 = df0.astype(np.float64)
  146. tm.assert_frame_equal(df, df0)
  147. def test_airline(datapath):
  148. fname = datapath("io", "sas", "data", "airline.sas7bdat")
  149. df = pd.read_sas(fname)
  150. fname = datapath("io", "sas", "data", "airline.csv")
  151. df0 = pd.read_csv(fname)
  152. df0 = df0.astype(np.float64)
  153. tm.assert_frame_equal(df, df0, check_exact=False)
  154. def test_date_time(datapath):
  155. # Support of different SAS date/datetime formats (PR #15871)
  156. fname = datapath("io", "sas", "data", "datetime.sas7bdat")
  157. df = pd.read_sas(fname)
  158. fname = datapath("io", "sas", "data", "datetime.csv")
  159. df0 = pd.read_csv(fname, parse_dates=['Date1', 'Date2', 'DateTime',
  160. 'DateTimeHi', 'Taiw'])
  161. # GH 19732: Timestamps imported from sas will incur floating point errors
  162. df.iloc[:, 3] = df.iloc[:, 3].dt.round('us')
  163. tm.assert_frame_equal(df, df0)
  164. def test_compact_numerical_values(datapath):
  165. # Regression test for #21616
  166. fname = datapath("io", "sas", "data", "cars.sas7bdat")
  167. df = pd.read_sas(fname, encoding='latin-1')
  168. # The two columns CYL and WGT in cars.sas7bdat have column
  169. # width < 8 and only contain integral values.
  170. # Test that pandas doesn't corrupt the numbers by adding
  171. # decimals.
  172. result = df['WGT']
  173. expected = df['WGT'].round()
  174. tm.assert_series_equal(result, expected, check_exact=True)
  175. result = df['CYL']
  176. expected = df['CYL'].round()
  177. tm.assert_series_equal(result, expected, check_exact=True)
  178. def test_many_columns(datapath):
  179. # Test for looking for column information in more places (PR #22628)
  180. fname = datapath("io", "sas", "data", "many_columns.sas7bdat")
  181. df = pd.read_sas(fname, encoding='latin-1')
  182. fname = datapath("io", "sas", "data", "many_columns.csv")
  183. df0 = pd.read_csv(fname, encoding='latin-1')
  184. tm.assert_frame_equal(df, df0)
  185. def test_inconsistent_number_of_rows(datapath):
  186. # Regression test for issue #16615. (PR #22628)
  187. fname = datapath("io", "sas", "data", "load_log.sas7bdat")
  188. df = pd.read_sas(fname, encoding='latin-1')
  189. assert len(df) == 2097
  190. def test_zero_variables(datapath):
  191. # Check if the SAS file has zero variables (PR #18184)
  192. fname = datapath("io", "sas", "data", "zero_variables.sas7bdat")
  193. with pytest.raises(EmptyDataError):
  194. pd.read_sas(fname)