test_multi_thread.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. # -*- coding: utf-8 -*-
  2. """
  3. Tests multithreading behaviour for reading and
  4. parsing files for each parser defined in parsers.py
  5. """
  6. from __future__ import division
  7. from multiprocessing.pool import ThreadPool
  8. import numpy as np
  9. from pandas.compat import BytesIO, range
  10. import pandas as pd
  11. from pandas import DataFrame
  12. import pandas.util.testing as tm
  13. def _construct_dataframe(num_rows):
  14. """
  15. Construct a DataFrame for testing.
  16. Parameters
  17. ----------
  18. num_rows : int
  19. The number of rows for our DataFrame.
  20. Returns
  21. -------
  22. df : DataFrame
  23. """
  24. df = DataFrame(np.random.rand(num_rows, 5), columns=list("abcde"))
  25. df["foo"] = "foo"
  26. df["bar"] = "bar"
  27. df["baz"] = "baz"
  28. df["date"] = pd.date_range("20000101 09:00:00",
  29. periods=num_rows,
  30. freq="s")
  31. df["int"] = np.arange(num_rows, dtype="int64")
  32. return df
  33. def test_multi_thread_string_io_read_csv(all_parsers):
  34. # see gh-11786
  35. parser = all_parsers
  36. max_row_range = 10000
  37. num_files = 100
  38. bytes_to_df = [
  39. "\n".join(
  40. ["%d,%d,%d" % (i, i, i) for i in range(max_row_range)]
  41. ).encode() for _ in range(num_files)]
  42. files = [BytesIO(b) for b in bytes_to_df]
  43. # Read all files in many threads.
  44. pool = ThreadPool(8)
  45. results = pool.map(parser.read_csv, files)
  46. first_result = results[0]
  47. for result in results:
  48. tm.assert_frame_equal(first_result, result)
  49. def _generate_multi_thread_dataframe(parser, path, num_rows, num_tasks):
  50. """
  51. Generate a DataFrame via multi-thread.
  52. Parameters
  53. ----------
  54. parser : BaseParser
  55. The parser object to use for reading the data.
  56. path : str
  57. The location of the CSV file to read.
  58. num_rows : int
  59. The number of rows to read per task.
  60. num_tasks : int
  61. The number of tasks to use for reading this DataFrame.
  62. Returns
  63. -------
  64. df : DataFrame
  65. """
  66. def reader(arg):
  67. """
  68. Create a reader for part of the CSV.
  69. Parameters
  70. ----------
  71. arg : tuple
  72. A tuple of the following:
  73. * start : int
  74. The starting row to start for parsing CSV
  75. * nrows : int
  76. The number of rows to read.
  77. Returns
  78. -------
  79. df : DataFrame
  80. """
  81. start, nrows = arg
  82. if not start:
  83. return parser.read_csv(path, index_col=0, header=0,
  84. nrows=nrows, parse_dates=["date"])
  85. return parser.read_csv(path, index_col=0, header=None,
  86. skiprows=int(start) + 1,
  87. nrows=nrows, parse_dates=[9])
  88. tasks = [
  89. (num_rows * i // num_tasks,
  90. num_rows // num_tasks) for i in range(num_tasks)
  91. ]
  92. pool = ThreadPool(processes=num_tasks)
  93. results = pool.map(reader, tasks)
  94. header = results[0].columns
  95. for r in results[1:]:
  96. r.columns = header
  97. final_dataframe = pd.concat(results)
  98. return final_dataframe
  99. def test_multi_thread_path_multipart_read_csv(all_parsers):
  100. # see gh-11786
  101. num_tasks = 4
  102. num_rows = 100000
  103. parser = all_parsers
  104. file_name = "__thread_pool_reader__.csv"
  105. df = _construct_dataframe(num_rows)
  106. with tm.ensure_clean(file_name) as path:
  107. df.to_csv(path)
  108. final_dataframe = _generate_multi_thread_dataframe(parser, path,
  109. num_rows, num_tasks)
  110. tm.assert_frame_equal(df, final_dataframe)