123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145 |
- # -*- coding: utf-8 -*-
- """
- Tests multithreading behaviour for reading and
- parsing files for each parser defined in parsers.py
- """
- from __future__ import division
- from multiprocessing.pool import ThreadPool
- import numpy as np
- from pandas.compat import BytesIO, range
- import pandas as pd
- from pandas import DataFrame
- import pandas.util.testing as tm
- def _construct_dataframe(num_rows):
- """
- Construct a DataFrame for testing.
- Parameters
- ----------
- num_rows : int
- The number of rows for our DataFrame.
- Returns
- -------
- df : DataFrame
- """
- df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde"))
- df["foo"] = "foo"
- df["bar"] = "bar"
- df["baz"] = "baz"
- df["date"] = pd.date_range("20000101 09:00:00",
- periods=num_rows,
- freq="s")
- df["int"] = np.arange(num_rows, dtype="int64")
- return df
- def test_multi_thread_string_io_read_csv(all_parsers):
- # see gh-11786
- parser = all_parsers
- max_row_range = 10000
- num_files = 100
- bytes_to_df = [
- "\n".join(
- ["%d,%d,%d" % (i, i, i) for i in range(max_row_range)]
- ).encode() for _ in range(num_files)]
- files = [BytesIO(b) for b in bytes_to_df]
- # Read all files in many threads.
- pool = ThreadPool(8)
- results = pool.map(parser.read_csv, files)
- first_result = results[0]
- for result in results:
- tm.assert_frame_equal(first_result, result)
- def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
- """
- Generate a DataFrame via multi-thread.
- Parameters
- ----------
- parser : BaseParser
- The parser object to use for reading the data.
- path : str
- The location of the CSV file to read.
- num_rows : int
- The number of rows to read per task.
- num_tasks : int
- The number of tasks to use for reading this DataFrame.
- Returns
- -------
- df : DataFrame
- """
- def reader(arg):
- """
- Create a reader for part of the CSV.
- Parameters
- ----------
- arg : tuple
- A tuple of the following:
- * start : int
- The starting row to start for parsing CSV
- * nrows : int
- The number of rows to read.
- Returns
- -------
- df : DataFrame
- """
- start, nrows = arg
- if not start:
- return parser.read_csv(path, index_col=0, header=0,
- nrows=nrows, parse_dates=["date"])
- return parser.read_csv(path, index_col=0, header=None,
- skiprows=int(start) + 1,
- nrows=nrows, parse_dates=[9])
- tasks = [
- (num_rows * i // num_tasks,
- num_rows // num_tasks) for i in range(num_tasks)
- ]
- pool = ThreadPool(processes=num_tasks)
- results = pool.map(reader, tasks)
- header = results[0].columns
- for r in results[1:]:
- r.columns = header
- final_dataframe = pd.concat(results)
- return final_dataframe
- def test_multi_thread_path_multipart_read_csv(all_parsers):
- # see gh-11786
- num_tasks = 4
- num_rows = 100000
- parser = all_parsers
- file_name = "__thread_pool_reader__.csv"
- df = _construct_dataframe(num_rows)
- with tm.ensure_clean(file_name) as path:
- df.to_csv(path)
- final_dataframe = _generate_multi_thread_dataframe(parser, path,
- num_rows, num_tasks)
- tm.assert_frame_equal(df, final_dataframe)
|