import textwrap import warnings from pandas._libs import NaT, lib import pandas.core.common as com from pandas.core.indexes.base import ( Index, _new_Index, ensure_index, ensure_index_from_sequences) from pandas.core.indexes.base import InvalidIndexError # noqa:F401 from pandas.core.indexes.category import CategoricalIndex # noqa:F401 from pandas.core.indexes.datetimes import DatetimeIndex from pandas.core.indexes.interval import IntervalIndex # noqa:F401 from pandas.core.indexes.multi import MultiIndex # noqa:F401 from pandas.core.indexes.numeric import ( # noqa:F401 Float64Index, Int64Index, NumericIndex, UInt64Index) from pandas.core.indexes.period import PeriodIndex from pandas.core.indexes.range import RangeIndex # noqa:F401 from pandas.core.indexes.timedeltas import TimedeltaIndex _sort_msg = textwrap.dedent("""\ Sorting because non-concatenation axis is not aligned. A future version of pandas will change to not sort by default. To accept the future behavior, pass 'sort=False'. To retain the current behavior and silence the warning, pass 'sort=True'. """) # TODO: there are many places that rely on these private methods existing in # pandas.core.index __all__ = ['Index', 'MultiIndex', 'NumericIndex', 'Float64Index', 'Int64Index', 'CategoricalIndex', 'IntervalIndex', 'RangeIndex', 'UInt64Index', 'InvalidIndexError', 'TimedeltaIndex', 'PeriodIndex', 'DatetimeIndex', '_new_Index', 'NaT', 'ensure_index', 'ensure_index_from_sequences', '_get_combined_index', '_get_objs_combined_axis', '_union_indexes', '_get_consensus_names', '_all_indexes_same'] def _get_objs_combined_axis(objs, intersect=False, axis=0, sort=True): """ Extract combined index: return intersection or union (depending on the value of "intersect") of indexes on given axis, or None if all objects lack indexes (e.g. they are numpy arrays). Parameters ---------- objs : list of objects Each object will only be considered if it has a _get_axis attribute. intersect : bool, default False If True, calculate the intersection between indexes. Otherwise, calculate the union. axis : {0 or 'index', 1 or 'outer'}, default 0 The axis to extract indexes from. sort : bool, default True Whether the result index should come out sorted or not. Returns ------- Index """ obs_idxes = [obj._get_axis(axis) for obj in objs if hasattr(obj, '_get_axis')] if obs_idxes: return _get_combined_index(obs_idxes, intersect=intersect, sort=sort) def _get_distinct_objs(objs): """ Return a list with distinct elements of "objs" (different ids). Preserves order. """ ids = set() res = [] for obj in objs: if not id(obj) in ids: ids.add(id(obj)) res.append(obj) return res def _get_combined_index(indexes, intersect=False, sort=False): """ Return the union or intersection of indexes. Parameters ---------- indexes : list of Index or list objects When intersect=True, do not accept list of lists. intersect : bool, default False If True, calculate the intersection between indexes. Otherwise, calculate the union. sort : bool, default False Whether the result index should come out sorted or not. Returns ------- Index """ # TODO: handle index names! indexes = _get_distinct_objs(indexes) if len(indexes) == 0: index = Index([]) elif len(indexes) == 1: index = indexes[0] elif intersect: index = indexes[0] for other in indexes[1:]: index = index.intersection(other) else: index = _union_indexes(indexes, sort=sort) index = ensure_index(index) if sort: try: index = index.sort_values() except TypeError: pass return index def _union_indexes(indexes, sort=True): """ Return the union of indexes. The behavior of sort and names is not consistent. Parameters ---------- indexes : list of Index or list objects sort : bool, default True Whether the result index should come out sorted or not. Returns ------- Index """ if len(indexes) == 0: raise AssertionError('Must have at least 1 Index to union') if len(indexes) == 1: result = indexes[0] if isinstance(result, list): result = Index(sorted(result)) return result indexes, kind = _sanitize_and_check(indexes) def _unique_indices(inds): """ Convert indexes to lists and concatenate them, removing duplicates. The final dtype is inferred. Parameters ---------- inds : list of Index or list objects Returns ------- Index """ def conv(i): if isinstance(i, Index): i = i.tolist() return i return Index( lib.fast_unique_multiple_list([conv(i) for i in inds], sort=sort)) if kind == 'special': result = indexes[0] if hasattr(result, 'union_many'): return result.union_many(indexes[1:]) else: for other in indexes[1:]: result = result.union(other) return result elif kind == 'array': index = indexes[0] for other in indexes[1:]: if not index.equals(other): if sort is None: # TODO: remove once pd.concat sort default changes warnings.warn(_sort_msg, FutureWarning, stacklevel=8) sort = True return _unique_indices(indexes) name = _get_consensus_names(indexes)[0] if name != index.name: index = index._shallow_copy(name=name) return index else: # kind='list' return _unique_indices(indexes) def _sanitize_and_check(indexes): """ Verify the type of indexes and convert lists to Index. Cases: - [list, list, ...]: Return ([list, list, ...], 'list') - [list, Index, ...]: Return _sanitize_and_check([Index, Index, ...]) Lists are sorted and converted to Index. - [Index, Index, ...]: Return ([Index, Index, ...], TYPE) TYPE = 'special' if at least one special type, 'array' otherwise. Parameters ---------- indexes : list of Index or list objects Returns ------- sanitized_indexes : list of Index or list objects type : {'list', 'array', 'special'} """ kinds = list({type(index) for index in indexes}) if list in kinds: if len(kinds) > 1: indexes = [Index(com.try_sort(x)) if not isinstance(x, Index) else x for x in indexes] kinds.remove(list) else: return indexes, 'list' if len(kinds) > 1 or Index not in kinds: return indexes, 'special' else: return indexes, 'array' def _get_consensus_names(indexes): """ Give a consensus 'names' to indexes. If there's exactly one non-empty 'names', return this, otherwise, return empty. Parameters ---------- indexes : list of Index objects Returns ------- list A list representing the consensus 'names' found. """ # find the non-none names, need to tupleify to make # the set hashable, then reverse on return consensus_names = {tuple(i.names) for i in indexes if com._any_not_none(*i.names)} if len(consensus_names) == 1: return list(list(consensus_names)[0]) return [None] * indexes[0].nlevels def _all_indexes_same(indexes): """ Determine if all indexes contain the same elements. Parameters ---------- indexes : list of Index objects Returns ------- bool True if all indexes contain the same elements, False otherwise. """ first = indexes[0] for index in indexes[1:]: if not first.equals(index): return False return True