123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204 |
- # -*- coding: utf-8 -*-
- """
- Tests parsers ability to read and parse non-local files
- and hence require a network connection to be read.
- """
- import logging
- import numpy as np
- import pytest
- from pandas.compat import BytesIO, StringIO
- import pandas.util._test_decorators as td
- from pandas import DataFrame
- import pandas.util.testing as tm
- from pandas.io.parsers import read_csv
- @pytest.mark.network
- @pytest.mark.parametrize(
- "compress_type, extension", [
- ('gzip', '.gz'), ('bz2', '.bz2'), ('zip', '.zip'),
- pytest.param('xz', '.xz', marks=td.skip_if_no_lzma)
- ]
- )
- @pytest.mark.parametrize('mode', ['explicit', 'infer'])
- @pytest.mark.parametrize('engine', ['python', 'c'])
- def test_compressed_urls(salaries_table, compress_type, extension, mode,
- engine):
- check_compressed_urls(salaries_table, compress_type, extension, mode,
- engine)
- @tm.network
- def check_compressed_urls(salaries_table, compression, extension, mode,
- engine):
- # test reading compressed urls with various engines and
- # extension inference
- base_url = ('https://github.com/pandas-dev/pandas/raw/master/'
- 'pandas/tests/io/parser/data/salaries.csv')
- url = base_url + extension
- if mode != 'explicit':
- compression = mode
- url_table = read_csv(url, sep='\t', compression=compression, engine=engine)
- tm.assert_frame_equal(url_table, salaries_table)
- @pytest.fixture
- def tips_df(datapath):
- """DataFrame with the tips dataset."""
- return read_csv(datapath('io', 'parser', 'data', 'tips.csv'))
- @pytest.mark.usefixtures("s3_resource")
- @td.skip_if_not_us_locale()
- class TestS3(object):
- def test_parse_public_s3_bucket(self, tips_df):
- pytest.importorskip('s3fs')
- # more of an integration test due to the not-public contents portion
- # can probably mock this though.
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df = read_csv('s3://pandas-test/tips.csv' +
- ext, compression=comp)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(df, tips_df)
- # Read public file from bucket with not-public contents
- df = read_csv('s3://cant_get_it/tips.csv')
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(df, tips_df)
- def test_parse_public_s3n_bucket(self, tips_df):
- # Read from AWS s3 as "s3n" URL
- df = read_csv('s3n://pandas-test/tips.csv', nrows=10)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(tips_df.iloc[:10], df)
- def test_parse_public_s3a_bucket(self, tips_df):
- # Read from AWS s3 as "s3a" URL
- df = read_csv('s3a://pandas-test/tips.csv', nrows=10)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(tips_df.iloc[:10], df)
- def test_parse_public_s3_bucket_nrows(self, tips_df):
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df = read_csv('s3://pandas-test/tips.csv' +
- ext, nrows=10, compression=comp)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(tips_df.iloc[:10], df)
- def test_parse_public_s3_bucket_chunked(self, tips_df):
- # Read with a chunksize
- chunksize = 5
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df_reader = read_csv('s3://pandas-test/tips.csv' + ext,
- chunksize=chunksize, compression=comp)
- assert df_reader.chunksize == chunksize
- for i_chunk in [0, 1, 2]:
- # Read a couple of chunks and make sure we see them
- # properly.
- df = df_reader.get_chunk()
- assert isinstance(df, DataFrame)
- assert not df.empty
- true_df = tips_df.iloc[
- chunksize * i_chunk: chunksize * (i_chunk + 1)]
- tm.assert_frame_equal(true_df, df)
- def test_parse_public_s3_bucket_chunked_python(self, tips_df):
- # Read with a chunksize using the Python parser
- chunksize = 5
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df_reader = read_csv('s3://pandas-test/tips.csv' + ext,
- chunksize=chunksize, compression=comp,
- engine='python')
- assert df_reader.chunksize == chunksize
- for i_chunk in [0, 1, 2]:
- # Read a couple of chunks and make sure we see them properly.
- df = df_reader.get_chunk()
- assert isinstance(df, DataFrame)
- assert not df.empty
- true_df = tips_df.iloc[
- chunksize * i_chunk: chunksize * (i_chunk + 1)]
- tm.assert_frame_equal(true_df, df)
- def test_parse_public_s3_bucket_python(self, tips_df):
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
- compression=comp)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(df, tips_df)
- def test_infer_s3_compression(self, tips_df):
- for ext in ['', '.gz', '.bz2']:
- df = read_csv('s3://pandas-test/tips.csv' + ext,
- engine='python', compression='infer')
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(df, tips_df)
- def test_parse_public_s3_bucket_nrows_python(self, tips_df):
- for ext, comp in [('', None), ('.gz', 'gzip'), ('.bz2', 'bz2')]:
- df = read_csv('s3://pandas-test/tips.csv' + ext, engine='python',
- nrows=10, compression=comp)
- assert isinstance(df, DataFrame)
- assert not df.empty
- tm.assert_frame_equal(tips_df.iloc[:10], df)
- def test_s3_fails(self):
- with pytest.raises(IOError):
- read_csv('s3://nyqpug/asdf.csv')
- # Receive a permission error when trying to read a private bucket.
- # It's irrelevant here that this isn't actually a table.
- with pytest.raises(IOError):
- read_csv('s3://cant_get_it/')
- def test_read_csv_handles_boto_s3_object(self,
- s3_resource,
- tips_file):
- # see gh-16135
- s3_object = s3_resource.meta.client.get_object(
- Bucket='pandas-test',
- Key='tips.csv')
- result = read_csv(BytesIO(s3_object["Body"].read()), encoding='utf8')
- assert isinstance(result, DataFrame)
- assert not result.empty
- expected = read_csv(tips_file)
- tm.assert_frame_equal(result, expected)
- def test_read_csv_chunked_download(self, s3_resource, caplog):
- # 8 MB, S3FS usees 5MB chunks
- df = DataFrame(np.random.randn(100000, 4), columns=list('abcd'))
- buf = BytesIO()
- str_buf = StringIO()
- df.to_csv(str_buf)
- buf = BytesIO(str_buf.getvalue().encode('utf-8'))
- s3_resource.Bucket("pandas-test").put_object(
- Key="large-file.csv",
- Body=buf)
- with caplog.at_level(logging.DEBUG, logger='s3fs.core'):
- read_csv("s3://pandas-test/large-file.csv", nrows=5)
- # log of fetch_range (start, stop)
- assert ((0, 5505024) in {x.args[-2:] for x in caplog.records})
|