test_s3.py 728 B

1234567891011121314151617181920212223242526272829
  1. import pytest
  2. from pandas.compat import BytesIO
  3. from pandas import read_csv
  4. from pandas.io.common import is_s3_url
  5. class TestS3URL(object):
  6. def test_is_s3_url(self):
  7. assert is_s3_url("s3://pandas/somethingelse.com")
  8. assert not is_s3_url("s4://pandas/somethingelse.com")
  9. def test_streaming_s3_objects():
  10. # GH17135
  11. # botocore gained iteration support in 1.10.47, can now be used in read_*
  12. pytest.importorskip('botocore', minversion='1.10.47')
  13. from botocore.response import StreamingBody
  14. data = [
  15. b'foo,bar,baz\n1,2,3\n4,5,6\n',
  16. b'just,the,header\n',
  17. ]
  18. for el in data:
  19. body = StreamingBody(BytesIO(el), content_length=len(el))
  20. read_csv(body)