# -*- coding: utf-8 -*- """ Tests the TextReader class in parsers.pyx, which is integral to the C engine in parsers.py """ import os import numpy as np from numpy import nan import pytest import pandas._libs.parsers as parser from pandas._libs.parsers import TextReader import pandas.compat as compat from pandas.compat import BytesIO, StringIO, map from pandas import DataFrame import pandas.util.testing as tm from pandas.util.testing import assert_frame_equal from pandas.io.parsers import TextFileReader, read_csv class TestTextReader(object): @pytest.fixture(autouse=True) def setup_method(self, datapath): self.dirpath = datapath('io', 'parser', 'data') self.csv1 = os.path.join(self.dirpath, 'test1.csv') self.csv2 = os.path.join(self.dirpath, 'test2.csv') self.xls1 = os.path.join(self.dirpath, 'test.xls') def test_file_handle(self): with open(self.csv1, 'rb') as f: reader = TextReader(f) reader.read() def test_string_filename(self): reader = TextReader(self.csv1, header=None) reader.read() def test_file_handle_mmap(self): with open(self.csv1, 'rb') as f: reader = TextReader(f, memory_map=True, header=None) reader.read() def test_StringIO(self): with open(self.csv1, 'rb') as f: text = f.read() src = BytesIO(text) reader = TextReader(src, header=None) reader.read() def test_string_factorize(self): # should this be optional? data = 'a\nb\na\nb\na' reader = TextReader(StringIO(data), header=None) result = reader.read() assert len(set(map(id, result[0]))) == 2 def test_skipinitialspace(self): data = ('a, b\n' 'a, b\n' 'a, b\n' 'a, b') reader = TextReader(StringIO(data), skipinitialspace=True, header=None) result = reader.read() tm.assert_numpy_array_equal(result[0], np.array(['a', 'a', 'a', 'a'], dtype=np.object_)) tm.assert_numpy_array_equal(result[1], np.array(['b', 'b', 'b', 'b'], dtype=np.object_)) def test_parse_booleans(self): data = 'True\nFalse\nTrue\nTrue' reader = TextReader(StringIO(data), header=None) result = reader.read() assert result[0].dtype == np.bool_ def test_delimit_whitespace(self): data = 'a b\na\t\t "b"\n"a"\t \t b' reader = TextReader(StringIO(data), delim_whitespace=True, header=None) result = reader.read() tm.assert_numpy_array_equal(result[0], np.array(['a', 'a', 'a'], dtype=np.object_)) tm.assert_numpy_array_equal(result[1], np.array(['b', 'b', 'b'], dtype=np.object_)) def test_embedded_newline(self): data = 'a\n"hello\nthere"\nthis' reader = TextReader(StringIO(data), header=None) result = reader.read() expected = np.array(['a', 'hello\nthere', 'this'], dtype=np.object_) tm.assert_numpy_array_equal(result[0], expected) def test_euro_decimal(self): data = '12345,67\n345,678' reader = TextReader(StringIO(data), delimiter=':', decimal=',', header=None) result = reader.read() expected = np.array([12345.67, 345.678]) tm.assert_almost_equal(result[0], expected) def test_integer_thousands(self): data = '123,456\n12,500' reader = TextReader(StringIO(data), delimiter=':', thousands=',', header=None) result = reader.read() expected = np.array([123456, 12500], dtype=np.int64) tm.assert_almost_equal(result[0], expected) def test_integer_thousands_alt(self): data = '123.456\n12.500' reader = TextFileReader(StringIO(data), delimiter=':', thousands='.', header=None) result = reader.read() expected = DataFrame([123456, 12500]) tm.assert_frame_equal(result, expected) def test_skip_bad_lines(self, capsys): # too many lines, see #2430 for why data = ('a:b:c\n' 'd:e:f\n' 'g:h:i\n' 'j:k:l:m\n' 'l:m:n\n' 'o:p:q:r') reader = TextReader(StringIO(data), delimiter=':', header=None) msg = (r"Error tokenizing data\. C error: Expected 3 fields in" " line 4, saw 4") with pytest.raises(parser.ParserError, match=msg): reader.read() reader = TextReader(StringIO(data), delimiter=':', header=None, error_bad_lines=False, warn_bad_lines=False) result = reader.read() expected = {0: np.array(['a', 'd', 'g', 'l'], dtype=object), 1: np.array(['b', 'e', 'h', 'm'], dtype=object), 2: np.array(['c', 'f', 'i', 'n'], dtype=object)} assert_array_dicts_equal(result, expected) reader = TextReader(StringIO(data), delimiter=':', header=None, error_bad_lines=False, warn_bad_lines=True) reader.read() captured = capsys.readouterr() assert 'Skipping line 4' in captured.err assert 'Skipping line 6' in captured.err def test_header_not_enough_lines(self): data = ('skip this\n' 'skip this\n' 'a,b,c\n' '1,2,3\n' '4,5,6') reader = TextReader(StringIO(data), delimiter=',', header=2) header = reader.header expected = [['a', 'b', 'c']] assert header == expected recs = reader.read() expected = {0: np.array([1, 4], dtype=np.int64), 1: np.array([2, 5], dtype=np.int64), 2: np.array([3, 6], dtype=np.int64)} assert_array_dicts_equal(recs, expected) def test_escapechar(self): data = ('\\"hello world\"\n' '\\"hello world\"\n' '\\"hello world\"') reader = TextReader(StringIO(data), delimiter=',', header=None, escapechar='\\') result = reader.read() expected = {0: np.array(['"hello world"'] * 3, dtype=object)} assert_array_dicts_equal(result, expected) def test_eof_has_eol(self): # handling of new line at EOF pass def test_na_substitution(self): pass def test_numpy_string_dtype(self): data = """\ a,1 aa,2 aaa,3 aaaa,4 aaaaa,5""" def _make_reader(**kwds): return TextReader(StringIO(data), delimiter=',', header=None, **kwds) reader = _make_reader(dtype='S5,i4') result = reader.read() assert result[0].dtype == 'S5' ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaaa'], dtype='S5') assert (result[0] == ex_values).all() assert result[1].dtype == 'i4' reader = _make_reader(dtype='S4') result = reader.read() assert result[0].dtype == 'S4' ex_values = np.array(['a', 'aa', 'aaa', 'aaaa', 'aaaa'], dtype='S4') assert (result[0] == ex_values).all() assert result[1].dtype == 'S4' def test_pass_dtype(self): data = """\ one,two 1,a 2,b 3,c 4,d""" def _make_reader(**kwds): return TextReader(StringIO(data), delimiter=',', **kwds) reader = _make_reader(dtype={'one': 'u1', 1: 'S1'}) result = reader.read() assert result[0].dtype == 'u1' assert result[1].dtype == 'S1' reader = _make_reader(dtype={'one': np.uint8, 1: object}) result = reader.read() assert result[0].dtype == 'u1' assert result[1].dtype == 'O' reader = _make_reader(dtype={'one': np.dtype('u1'), 1: np.dtype('O')}) result = reader.read() assert result[0].dtype == 'u1' assert result[1].dtype == 'O' def test_usecols(self): data = """\ a,b,c 1,2,3 4,5,6 7,8,9 10,11,12""" def _make_reader(**kwds): return TextReader(StringIO(data), delimiter=',', **kwds) reader = _make_reader(usecols=(1, 2)) result = reader.read() exp = _make_reader().read() assert len(result) == 2 assert (result[1] == exp[1]).all() assert (result[2] == exp[2]).all() def test_cr_delimited(self): def _test(text, **kwargs): nice_text = text.replace('\r', '\r\n') result = TextReader(StringIO(text), **kwargs).read() expected = TextReader(StringIO(nice_text), **kwargs).read() assert_array_dicts_equal(result, expected) data = 'a,b,c\r1,2,3\r4,5,6\r7,8,9\r10,11,12' _test(data, delimiter=',') data = 'a b c\r1 2 3\r4 5 6\r7 8 9\r10 11 12' _test(data, delim_whitespace=True) data = 'a,b,c\r1,2,3\r4,5,6\r,88,9\r10,11,12' _test(data, delimiter=',') sample = ('A,B,C,D,E,F,G,H,I,J,K,L,M,N,O\r' 'AAAAA,BBBBB,0,0,0,0,0,0,0,0,0,0,0,0,0\r' ',BBBBB,0,0,0,0,0,0,0,0,0,0,0,0,0') _test(sample, delimiter=',') data = 'A B C\r 2 3\r4 5 6' _test(data, delim_whitespace=True) data = 'A B C\r2 3\r4 5 6' _test(data, delim_whitespace=True) def test_empty_field_eof(self): data = 'a,b,c\n1,2,3\n4,,' result = TextReader(StringIO(data), delimiter=',').read() expected = {0: np.array([1, 4], dtype=np.int64), 1: np.array(['2', ''], dtype=object), 2: np.array(['3', ''], dtype=object)} assert_array_dicts_equal(result, expected) # GH5664 a = DataFrame([['b'], [nan]], columns=['a'], index=['a', 'c']) b = DataFrame([[1, 1, 1, 0], [1, 1, 1, 0]], columns=list('abcd'), index=[1, 1]) c = DataFrame([[1, 2, 3, 4], [6, nan, nan, nan], [8, 9, 10, 11], [13, 14, nan, nan]], columns=list('abcd'), index=[0, 5, 7, 12]) for _ in range(100): df = read_csv(StringIO('a,b\nc\n'), skiprows=0, names=['a'], engine='c') assert_frame_equal(df, a) df = read_csv(StringIO('1,1,1,1,0\n' * 2 + '\n' * 2), names=list("abcd"), engine='c') assert_frame_equal(df, b) df = read_csv(StringIO('0,1,2,3,4\n5,6\n7,8,9,10,11\n12,13,14'), names=list('abcd'), engine='c') assert_frame_equal(df, c) def test_empty_csv_input(self): # GH14867 df = read_csv(StringIO(), chunksize=20, header=None, names=['a', 'b', 'c']) assert isinstance(df, TextFileReader) def assert_array_dicts_equal(left, right): for k, v in compat.iteritems(left): assert tm.assert_numpy_array_equal(np.asarray(v), np.asarray(right[k]))