# -*- coding: utf-8 -*- """ Tests multithreading behaviour for reading and parsing files for each parser defined in parsers.py """ from __future__ import division from multiprocessing.pool import ThreadPool import numpy as np from pandas.compat import BytesIO, range import pandas as pd from pandas import DataFrame import pandas.util.testing as tm def _construct_dataframe(num_rows): """ Construct a DataFrame for testing. Parameters ---------- num_rows : int The number of rows for our DataFrame. Returns ------- df : DataFrame """ df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde")) df["foo"] = "foo" df["bar"] = "bar" df["baz"] = "baz" df["date"] = pd.date_range("20000101 09:00:00", periods=num_rows, freq="s") df["int"] = np.arange(num_rows, dtype="int64") return df def test_multi_thread_string_io_read_csv(all_parsers): # see gh-11786 parser = all_parsers max_row_range = 10000 num_files = 100 bytes_to_df = [ "\n".join( ["%d,%d,%d" % (i, i, i) for i in range(max_row_range)] ).encode() for _ in range(num_files)] files = [BytesIO(b) for b in bytes_to_df] # Read all files in many threads. pool = ThreadPool(8) results = pool.map(parser.read_csv, files) first_result = results[0] for result in results: tm.assert_frame_equal(first_result, result) def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks): """ Generate a DataFrame via multi-thread. Parameters ---------- parser : BaseParser The parser object to use for reading the data. path : str The location of the CSV file to read. num_rows : int The number of rows to read per task. num_tasks : int The number of tasks to use for reading this DataFrame. Returns ------- df : DataFrame """ def reader(arg): """ Create a reader for part of the CSV. Parameters ---------- arg : tuple A tuple of the following: * start : int The starting row to start for parsing CSV * nrows : int The number of rows to read. Returns ------- df : DataFrame """ start, nrows = arg if not start: return parser.read_csv(path, index_col=0, header=0, nrows=nrows, parse_dates=["date"]) return parser.read_csv(path, index_col=0, header=None, skiprows=int(start) + 1, nrows=nrows, parse_dates=[9]) tasks = [ (num_rows * i // num_tasks, num_rows // num_tasks) for i in range(num_tasks) ] pool = ThreadPool(processes=num_tasks) results = pool.map(reader, tasks) header = results[0].columns for r in results[1:]: r.columns = header final_dataframe = pd.concat(results) return final_dataframe def test_multi_thread_path_multipart_read_csv(all_parsers): # see gh-11786 num_tasks = 4 num_rows = 100000 parser = all_parsers file_name = "__thread_pool_reader__.csv" df = _construct_dataframe(num_rows) with tm.ensure_clean(file_name) as path: df.to_csv(path) final_dataframe = _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks) tm.assert_frame_equal(df, final_dataframe)