concat.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. # -*- coding: utf-8 -*-
  2. # TODO: Needs a better name; too many modules are already called "concat"
  3. from collections import defaultdict
  4. import copy
  5. import numpy as np
  6. from pandas._libs import internals as libinternals, tslibs
  7. from pandas.util._decorators import cache_readonly
  8. from pandas.core.dtypes.cast import maybe_promote
  9. from pandas.core.dtypes.common import (
  10. _get_dtype, is_categorical_dtype, is_datetime64_dtype,
  11. is_datetime64tz_dtype, is_extension_array_dtype, is_float_dtype,
  12. is_numeric_dtype, is_sparse, is_timedelta64_dtype)
  13. import pandas.core.dtypes.concat as _concat
  14. from pandas.core.dtypes.missing import isna
  15. import pandas.core.algorithms as algos
  16. def get_mgr_concatenation_plan(mgr, indexers):
  17. """
  18. Construct concatenation plan for given block manager and indexers.
  19. Parameters
  20. ----------
  21. mgr : BlockManager
  22. indexers : dict of {axis: indexer}
  23. Returns
  24. -------
  25. plan : list of (BlockPlacement, JoinUnit) tuples
  26. """
  27. # Calculate post-reindex shape , save for item axis which will be separate
  28. # for each block anyway.
  29. mgr_shape = list(mgr.shape)
  30. for ax, indexer in indexers.items():
  31. mgr_shape[ax] = len(indexer)
  32. mgr_shape = tuple(mgr_shape)
  33. if 0 in indexers:
  34. ax0_indexer = indexers.pop(0)
  35. blknos = algos.take_1d(mgr._blknos, ax0_indexer, fill_value=-1)
  36. blklocs = algos.take_1d(mgr._blklocs, ax0_indexer, fill_value=-1)
  37. else:
  38. if mgr._is_single_block:
  39. blk = mgr.blocks[0]
  40. return [(blk.mgr_locs, JoinUnit(blk, mgr_shape, indexers))]
  41. ax0_indexer = None
  42. blknos = mgr._blknos
  43. blklocs = mgr._blklocs
  44. plan = []
  45. for blkno, placements in libinternals.get_blkno_placements(blknos,
  46. mgr.nblocks,
  47. group=False):
  48. assert placements.is_slice_like
  49. join_unit_indexers = indexers.copy()
  50. shape = list(mgr_shape)
  51. shape[0] = len(placements)
  52. shape = tuple(shape)
  53. if blkno == -1:
  54. unit = JoinUnit(None, shape)
  55. else:
  56. blk = mgr.blocks[blkno]
  57. ax0_blk_indexer = blklocs[placements.indexer]
  58. unit_no_ax0_reindexing = (len(placements) == len(blk.mgr_locs) and
  59. # Fastpath detection of join unit not
  60. # needing to reindex its block: no ax0
  61. # reindexing took place and block
  62. # placement was sequential before.
  63. ((ax0_indexer is None and
  64. blk.mgr_locs.is_slice_like and
  65. blk.mgr_locs.as_slice.step == 1) or
  66. # Slow-ish detection: all indexer locs
  67. # are sequential (and length match is
  68. # checked above).
  69. (np.diff(ax0_blk_indexer) == 1).all()))
  70. # Omit indexer if no item reindexing is required.
  71. if unit_no_ax0_reindexing:
  72. join_unit_indexers.pop(0, None)
  73. else:
  74. join_unit_indexers[0] = ax0_blk_indexer
  75. unit = JoinUnit(blk, shape, join_unit_indexers)
  76. plan.append((placements, unit))
  77. return plan
  78. class JoinUnit(object):
  79. def __init__(self, block, shape, indexers=None):
  80. # Passing shape explicitly is required for cases when block is None.
  81. if indexers is None:
  82. indexers = {}
  83. self.block = block
  84. self.indexers = indexers
  85. self.shape = shape
  86. def __repr__(self):
  87. return '{name}({block!r}, {indexers})'.format(
  88. name=self.__class__.__name__, block=self.block,
  89. indexers=self.indexers)
  90. @cache_readonly
  91. def needs_filling(self):
  92. for indexer in self.indexers.values():
  93. # FIXME: cache results of indexer == -1 checks.
  94. if (indexer == -1).any():
  95. return True
  96. return False
  97. @cache_readonly
  98. def dtype(self):
  99. if self.block is None:
  100. raise AssertionError("Block is None, no dtype")
  101. if not self.needs_filling:
  102. return self.block.dtype
  103. else:
  104. return _get_dtype(maybe_promote(self.block.dtype,
  105. self.block.fill_value)[0])
  106. @cache_readonly
  107. def is_na(self):
  108. if self.block is None:
  109. return True
  110. if not self.block._can_hold_na:
  111. return False
  112. # Usually it's enough to check but a small fraction of values to see if
  113. # a block is NOT null, chunks should help in such cases. 1000 value
  114. # was chosen rather arbitrarily.
  115. values = self.block.values
  116. if self.block.is_categorical:
  117. values_flat = values.categories
  118. elif is_sparse(self.block.values.dtype):
  119. return False
  120. elif self.block.is_extension:
  121. values_flat = values
  122. else:
  123. values_flat = values.ravel(order='K')
  124. total_len = values_flat.shape[0]
  125. chunk_len = max(total_len // 40, 1000)
  126. for i in range(0, total_len, chunk_len):
  127. if not isna(values_flat[i:i + chunk_len]).all():
  128. return False
  129. return True
  130. def get_reindexed_values(self, empty_dtype, upcasted_na):
  131. if upcasted_na is None:
  132. # No upcasting is necessary
  133. fill_value = self.block.fill_value
  134. values = self.block.get_values()
  135. else:
  136. fill_value = upcasted_na
  137. if self.is_na:
  138. if getattr(self.block, 'is_object', False):
  139. # we want to avoid filling with np.nan if we are
  140. # using None; we already know that we are all
  141. # nulls
  142. values = self.block.values.ravel(order='K')
  143. if len(values) and values[0] is None:
  144. fill_value = None
  145. if (getattr(self.block, 'is_datetimetz', False) or
  146. is_datetime64tz_dtype(empty_dtype)):
  147. if self.block is None:
  148. array = empty_dtype.construct_array_type()
  149. return array(np.full(self.shape[1], fill_value.value),
  150. dtype=empty_dtype)
  151. pass
  152. elif getattr(self.block, 'is_categorical', False):
  153. pass
  154. elif getattr(self.block, 'is_sparse', False):
  155. pass
  156. elif getattr(self.block, 'is_extension', False):
  157. pass
  158. else:
  159. missing_arr = np.empty(self.shape, dtype=empty_dtype)
  160. missing_arr.fill(fill_value)
  161. return missing_arr
  162. if not self.indexers:
  163. if not self.block._can_consolidate:
  164. # preserve these for validation in _concat_compat
  165. return self.block.values
  166. if self.block.is_bool and not self.block.is_categorical:
  167. # External code requested filling/upcasting, bool values must
  168. # be upcasted to object to avoid being upcasted to numeric.
  169. values = self.block.astype(np.object_).values
  170. elif self.block.is_extension:
  171. values = self.block.values
  172. else:
  173. # No dtype upcasting is done here, it will be performed during
  174. # concatenation itself.
  175. values = self.block.get_values()
  176. if not self.indexers:
  177. # If there's no indexing to be done, we want to signal outside
  178. # code that this array must be copied explicitly. This is done
  179. # by returning a view and checking `retval.base`.
  180. values = values.view()
  181. else:
  182. for ax, indexer in self.indexers.items():
  183. values = algos.take_nd(values, indexer, axis=ax,
  184. fill_value=fill_value)
  185. return values
  186. def concatenate_join_units(join_units, concat_axis, copy):
  187. """
  188. Concatenate values from several join units along selected axis.
  189. """
  190. if concat_axis == 0 and len(join_units) > 1:
  191. # Concatenating join units along ax0 is handled in _merge_blocks.
  192. raise AssertionError("Concatenating join units along axis0")
  193. empty_dtype, upcasted_na = get_empty_dtype_and_na(join_units)
  194. to_concat = [ju.get_reindexed_values(empty_dtype=empty_dtype,
  195. upcasted_na=upcasted_na)
  196. for ju in join_units]
  197. if len(to_concat) == 1:
  198. # Only one block, nothing to concatenate.
  199. concat_values = to_concat[0]
  200. if copy:
  201. if isinstance(concat_values, np.ndarray):
  202. # non-reindexed (=not yet copied) arrays are made into a view
  203. # in JoinUnit.get_reindexed_values
  204. if concat_values.base is not None:
  205. concat_values = concat_values.copy()
  206. else:
  207. concat_values = concat_values.copy()
  208. else:
  209. concat_values = _concat._concat_compat(to_concat, axis=concat_axis)
  210. return concat_values
  211. def get_empty_dtype_and_na(join_units):
  212. """
  213. Return dtype and N/A values to use when concatenating specified units.
  214. Returned N/A value may be None which means there was no casting involved.
  215. Returns
  216. -------
  217. dtype
  218. na
  219. """
  220. if len(join_units) == 1:
  221. blk = join_units[0].block
  222. if blk is None:
  223. return np.float64, np.nan
  224. if is_uniform_reindex(join_units):
  225. # XXX: integrate property
  226. empty_dtype = join_units[0].block.dtype
  227. upcasted_na = join_units[0].block.fill_value
  228. return empty_dtype, upcasted_na
  229. has_none_blocks = False
  230. dtypes = [None] * len(join_units)
  231. for i, unit in enumerate(join_units):
  232. if unit.block is None:
  233. has_none_blocks = True
  234. else:
  235. dtypes[i] = unit.dtype
  236. upcast_classes = defaultdict(list)
  237. null_upcast_classes = defaultdict(list)
  238. for dtype, unit in zip(dtypes, join_units):
  239. if dtype is None:
  240. continue
  241. if is_categorical_dtype(dtype):
  242. upcast_cls = 'category'
  243. elif is_datetime64tz_dtype(dtype):
  244. upcast_cls = 'datetimetz'
  245. elif issubclass(dtype.type, np.bool_):
  246. upcast_cls = 'bool'
  247. elif issubclass(dtype.type, np.object_):
  248. upcast_cls = 'object'
  249. elif is_datetime64_dtype(dtype):
  250. upcast_cls = 'datetime'
  251. elif is_timedelta64_dtype(dtype):
  252. upcast_cls = 'timedelta'
  253. elif is_sparse(dtype):
  254. upcast_cls = dtype.subtype.name
  255. elif is_extension_array_dtype(dtype):
  256. upcast_cls = 'object'
  257. elif is_float_dtype(dtype) or is_numeric_dtype(dtype):
  258. upcast_cls = dtype.name
  259. else:
  260. upcast_cls = 'float'
  261. # Null blocks should not influence upcast class selection, unless there
  262. # are only null blocks, when same upcasting rules must be applied to
  263. # null upcast classes.
  264. if unit.is_na:
  265. null_upcast_classes[upcast_cls].append(dtype)
  266. else:
  267. upcast_classes[upcast_cls].append(dtype)
  268. if not upcast_classes:
  269. upcast_classes = null_upcast_classes
  270. # create the result
  271. if 'object' in upcast_classes:
  272. return np.dtype(np.object_), np.nan
  273. elif 'bool' in upcast_classes:
  274. if has_none_blocks:
  275. return np.dtype(np.object_), np.nan
  276. else:
  277. return np.dtype(np.bool_), None
  278. elif 'category' in upcast_classes:
  279. return np.dtype(np.object_), np.nan
  280. elif 'datetimetz' in upcast_classes:
  281. # GH-25014. We use NaT instead of iNaT, since this eventually
  282. # ends up in DatetimeArray.take, which does not allow iNaT.
  283. dtype = upcast_classes['datetimetz']
  284. return dtype[0], tslibs.NaT
  285. elif 'datetime' in upcast_classes:
  286. return np.dtype('M8[ns]'), tslibs.iNaT
  287. elif 'timedelta' in upcast_classes:
  288. return np.dtype('m8[ns]'), tslibs.iNaT
  289. else: # pragma
  290. try:
  291. g = np.find_common_type(upcast_classes, [])
  292. except TypeError:
  293. # At least one is an ExtensionArray
  294. return np.dtype(np.object_), np.nan
  295. else:
  296. if is_float_dtype(g):
  297. return g, g.type(np.nan)
  298. elif is_numeric_dtype(g):
  299. if has_none_blocks:
  300. return np.float64, np.nan
  301. else:
  302. return g, None
  303. msg = "invalid dtype determination in get_concat_dtype"
  304. raise AssertionError(msg)
  305. def is_uniform_join_units(join_units):
  306. """
  307. Check if the join units consist of blocks of uniform type that can
  308. be concatenated using Block.concat_same_type instead of the generic
  309. concatenate_join_units (which uses `_concat._concat_compat`).
  310. """
  311. return (
  312. # all blocks need to have the same type
  313. all(type(ju.block) is type(join_units[0].block) for ju in join_units) and # noqa
  314. # no blocks that would get missing values (can lead to type upcasts)
  315. # unless we're an extension dtype.
  316. all(not ju.is_na or ju.block.is_extension for ju in join_units) and
  317. # no blocks with indexers (as then the dimensions do not fit)
  318. all(not ju.indexers for ju in join_units) and
  319. # disregard Panels
  320. all(ju.block.ndim <= 2 for ju in join_units) and
  321. # only use this path when there is something to concatenate
  322. len(join_units) > 1)
  323. def is_uniform_reindex(join_units):
  324. return (
  325. # TODO: should this be ju.block._can_hold_na?
  326. all(ju.block and ju.block.is_extension for ju in join_units) and
  327. len({ju.block.dtype.name for ju in join_units}) == 1
  328. )
  329. def trim_join_unit(join_unit, length):
  330. """
  331. Reduce join_unit's shape along item axis to length.
  332. Extra items that didn't fit are returned as a separate block.
  333. """
  334. if 0 not in join_unit.indexers:
  335. extra_indexers = join_unit.indexers
  336. if join_unit.block is None:
  337. extra_block = None
  338. else:
  339. extra_block = join_unit.block.getitem_block(slice(length, None))
  340. join_unit.block = join_unit.block.getitem_block(slice(length))
  341. else:
  342. extra_block = join_unit.block
  343. extra_indexers = copy.copy(join_unit.indexers)
  344. extra_indexers[0] = extra_indexers[0][length:]
  345. join_unit.indexers[0] = join_unit.indexers[0][:length]
  346. extra_shape = (join_unit.shape[0] - length,) + join_unit.shape[1:]
  347. join_unit.shape = (length,) + join_unit.shape[1:]
  348. return JoinUnit(block=extra_block, indexers=extra_indexers,
  349. shape=extra_shape)
  350. def combine_concat_plans(plans, concat_axis):
  351. """
  352. Combine multiple concatenation plans into one.
  353. existing_plan is updated in-place.
  354. """
  355. if len(plans) == 1:
  356. for p in plans[0]:
  357. yield p[0], [p[1]]
  358. elif concat_axis == 0:
  359. offset = 0
  360. for plan in plans:
  361. last_plc = None
  362. for plc, unit in plan:
  363. yield plc.add(offset), [unit]
  364. last_plc = plc
  365. if last_plc is not None:
  366. offset += last_plc.as_slice.stop
  367. else:
  368. num_ended = [0]
  369. def _next_or_none(seq):
  370. retval = next(seq, None)
  371. if retval is None:
  372. num_ended[0] += 1
  373. return retval
  374. plans = list(map(iter, plans))
  375. next_items = list(map(_next_or_none, plans))
  376. while num_ended[0] != len(next_items):
  377. if num_ended[0] > 0:
  378. raise ValueError("Plan shapes are not aligned")
  379. placements, units = zip(*next_items)
  380. lengths = list(map(len, placements))
  381. min_len, max_len = min(lengths), max(lengths)
  382. if min_len == max_len:
  383. yield placements[0], units
  384. next_items[:] = map(_next_or_none, plans)
  385. else:
  386. yielded_placement = None
  387. yielded_units = [None] * len(next_items)
  388. for i, (plc, unit) in enumerate(next_items):
  389. yielded_units[i] = unit
  390. if len(plc) > min_len:
  391. # trim_join_unit updates unit in place, so only
  392. # placement needs to be sliced to skip min_len.
  393. next_items[i] = (plc[min_len:],
  394. trim_join_unit(unit, min_len))
  395. else:
  396. yielded_placement = plc
  397. next_items[i] = _next_or_none(plans[i])
  398. yield yielded_placement, yielded_units