# pylint: disable=E1103 from collections import OrderedDict import numpy as np from numpy import nan from numpy.random import randn import pytest import pandas as pd from pandas import DataFrame, Index, MultiIndex, Series from pandas.core.reshape.concat import concat from pandas.core.reshape.merge import merge import pandas.util.testing as tm @pytest.fixture def left(): """left dataframe (not multi-indexed) for multi-index join tests""" # a little relevant example with NAs key1 = ['bar', 'bar', 'bar', 'foo', 'foo', 'baz', 'baz', 'qux', 'qux', 'snap'] key2 = ['two', 'one', 'three', 'one', 'two', 'one', 'two', 'two', 'three', 'one'] data = np.random.randn(len(key1)) return DataFrame({'key1': key1, 'key2': key2, 'data': data}) @pytest.fixture def right(): """right dataframe (multi-indexed) for multi-index join tests""" index = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux'], ['one', 'two', 'three']], codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]], names=['key1', 'key2']) return DataFrame(np.random.randn(10, 3), index=index, columns=['j_one', 'j_two', 'j_three']) @pytest.fixture def left_multi(): return ( DataFrame( dict(Origin=['A', 'A', 'B', 'B', 'C'], Destination=['A', 'B', 'A', 'C', 'A'], Period=['AM', 'AM', 'IP', 'AM', 'OP'], TripPurp=['hbw', 'nhb', 'hbo', 'nhb', 'hbw'], Trips=[1987, 3647, 2470, 4296, 4444]), columns=['Origin', 'Destination', 'Period', 'TripPurp', 'Trips']) .set_index(['Origin', 'Destination', 'Period', 'TripPurp'])) @pytest.fixture def right_multi(): return ( DataFrame( dict(Origin=['A', 'A', 'B', 'B', 'C', 'C', 'E'], Destination=['A', 'B', 'A', 'B', 'A', 'B', 'F'], Period=['AM', 'AM', 'IP', 'AM', 'OP', 'IP', 'AM'], LinkType=['a', 'b', 'c', 'b', 'a', 'b', 'a'], Distance=[100, 80, 90, 80, 75, 35, 55]), columns=['Origin', 'Destination', 'Period', 'LinkType', 'Distance']) .set_index(['Origin', 'Destination', 'Period', 'LinkType'])) @pytest.fixture def on_cols_multi(): return ['Origin', 'Destination', 'Period'] @pytest.fixture def idx_cols_multi(): return ['Origin', 'Destination', 'Period', 'TripPurp', 'LinkType'] class TestMergeMulti(object): def setup_method(self): self.index = MultiIndex(levels=[['foo', 'bar', 'baz', 'qux'], ['one', 'two', 'three']], codes=[[0, 0, 0, 1, 1, 2, 2, 3, 3, 3], [0, 1, 2, 0, 1, 1, 2, 0, 1, 2]], names=['first', 'second']) self.to_join = DataFrame(np.random.randn(10, 3), index=self.index, columns=['j_one', 'j_two', 'j_three']) # a little relevant example with NAs key1 = ['bar', 'bar', 'bar', 'foo', 'foo', 'baz', 'baz', 'qux', 'qux', 'snap'] key2 = ['two', 'one', 'three', 'one', 'two', 'one', 'two', 'two', 'three', 'one'] data = np.random.randn(len(key1)) self.data = DataFrame({'key1': key1, 'key2': key2, 'data': data}) def test_merge_on_multikey(self, left, right, join_type): on_cols = ['key1', 'key2'] result = (left.join(right, on=on_cols, how=join_type) .reset_index(drop=True)) expected = pd.merge(left, right.reset_index(), on=on_cols, how=join_type) tm.assert_frame_equal(result, expected) result = (left.join(right, on=on_cols, how=join_type, sort=True) .reset_index(drop=True)) expected = pd.merge(left, right.reset_index(), on=on_cols, how=join_type, sort=True) tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("sort", [False, True]) def test_left_join_multi_index(self, left, right, sort): icols = ['1st', '2nd', '3rd'] def bind_cols(df): iord = lambda a: 0 if a != a else ord(a) f = lambda ts: ts.map(iord) - ord('a') return (f(df['1st']) + f(df['3rd']) * 1e2 + df['2nd'].fillna(0) * 1e4) def run_asserts(left, right, sort): res = left.join(right, on=icols, how='left', sort=sort) assert len(left) < len(res) + 1 assert not res['4th'].isna().any() assert not res['5th'].isna().any() tm.assert_series_equal( res['4th'], - res['5th'], check_names=False) result = bind_cols(res.iloc[:, :-2]) tm.assert_series_equal(res['4th'], result, check_names=False) assert result.name is None if sort: tm.assert_frame_equal( res, res.sort_values(icols, kind='mergesort')) out = merge(left, right.reset_index(), on=icols, sort=sort, how='left') res.index = np.arange(len(res)) tm.assert_frame_equal(out, res) lc = list(map(chr, np.arange(ord('a'), ord('z') + 1))) left = DataFrame(np.random.choice(lc, (5000, 2)), columns=['1st', '3rd']) left.insert(1, '2nd', np.random.randint(0, 1000, len(left))) i = np.random.permutation(len(left)) right = left.iloc[i].copy() left['4th'] = bind_cols(left) right['5th'] = - bind_cols(right) right.set_index(icols, inplace=True) run_asserts(left, right, sort) # inject some nulls left.loc[1::23, '1st'] = np.nan left.loc[2::37, '2nd'] = np.nan left.loc[3::43, '3rd'] = np.nan left['4th'] = bind_cols(left) i = np.random.permutation(len(left)) right = left.iloc[i, :-1] right['5th'] = - bind_cols(right) right.set_index(icols, inplace=True) run_asserts(left, right, sort) @pytest.mark.parametrize("sort", [False, True]) def test_merge_right_vs_left(self, left, right, sort): # compare left vs right merge with multikey on_cols = ['key1', 'key2'] merged_left_right = left.merge(right, left_on=on_cols, right_index=True, how='left', sort=sort) merge_right_left = right.merge(left, right_on=on_cols, left_index=True, how='right', sort=sort) # Reorder columns merge_right_left = merge_right_left[merged_left_right.columns] tm.assert_frame_equal(merged_left_right, merge_right_left) def test_compress_group_combinations(self): # ~ 40000000 possible unique groups key1 = tm.rands_array(10, 10000) key1 = np.tile(key1, 2) key2 = key1[::-1] df = DataFrame({'key1': key1, 'key2': key2, 'value1': np.random.randn(20000)}) df2 = DataFrame({'key1': key1[::2], 'key2': key2[::2], 'value2': np.random.randn(10000)}) # just to hit the label compression code path merge(df, df2, how='outer') def test_left_join_index_preserve_order(self): on_cols = ['k1', 'k2'] left = DataFrame({'k1': [0, 1, 2] * 8, 'k2': ['foo', 'bar'] * 12, 'v': np.array(np.arange(24), dtype=np.int64)}) index = MultiIndex.from_tuples([(2, 'bar'), (1, 'foo')]) right = DataFrame({'v2': [5, 7]}, index=index) result = left.join(right, on=on_cols) expected = left.copy() expected['v2'] = np.nan expected.loc[(expected.k1 == 2) & (expected.k2 == 'bar'), 'v2'] = 5 expected.loc[(expected.k1 == 1) & (expected.k2 == 'foo'), 'v2'] = 7 tm.assert_frame_equal(result, expected) result.sort_values(on_cols, kind='mergesort', inplace=True) expected = left.join(right, on=on_cols, sort=True) tm.assert_frame_equal(result, expected) # test join with multi dtypes blocks left = DataFrame({'k1': [0, 1, 2] * 8, 'k2': ['foo', 'bar'] * 12, 'k3': np.array([0, 1, 2] * 8, dtype=np.float32), 'v': np.array(np.arange(24), dtype=np.int32)}) index = MultiIndex.from_tuples([(2, 'bar'), (1, 'foo')]) right = DataFrame({'v2': [5, 7]}, index=index) result = left.join(right, on=on_cols) expected = left.copy() expected['v2'] = np.nan expected.loc[(expected.k1 == 2) & (expected.k2 == 'bar'), 'v2'] = 5 expected.loc[(expected.k1 == 1) & (expected.k2 == 'foo'), 'v2'] = 7 tm.assert_frame_equal(result, expected) result = result.sort_values(on_cols, kind='mergesort') expected = left.join(right, on=on_cols, sort=True) tm.assert_frame_equal(result, expected) def test_left_join_index_multi_match_multiindex(self): left = DataFrame([ ['X', 'Y', 'C', 'a'], ['W', 'Y', 'C', 'e'], ['V', 'Q', 'A', 'h'], ['V', 'R', 'D', 'i'], ['X', 'Y', 'D', 'b'], ['X', 'Y', 'A', 'c'], ['W', 'Q', 'B', 'f'], ['W', 'R', 'C', 'g'], ['V', 'Y', 'C', 'j'], ['X', 'Y', 'B', 'd']], columns=['cola', 'colb', 'colc', 'tag'], index=[3, 2, 0, 1, 7, 6, 4, 5, 9, 8]) right = (DataFrame([ ['W', 'R', 'C', 0], ['W', 'Q', 'B', 3], ['W', 'Q', 'B', 8], ['X', 'Y', 'A', 1], ['X', 'Y', 'A', 4], ['X', 'Y', 'B', 5], ['X', 'Y', 'C', 6], ['X', 'Y', 'C', 9], ['X', 'Q', 'C', -6], ['X', 'R', 'C', -9], ['V', 'Y', 'C', 7], ['V', 'R', 'D', 2], ['V', 'R', 'D', -1], ['V', 'Q', 'A', -3]], columns=['col1', 'col2', 'col3', 'val']) .set_index(['col1', 'col2', 'col3'])) result = left.join(right, on=['cola', 'colb', 'colc'], how='left') expected = DataFrame([ ['X', 'Y', 'C', 'a', 6], ['X', 'Y', 'C', 'a', 9], ['W', 'Y', 'C', 'e', nan], ['V', 'Q', 'A', 'h', -3], ['V', 'R', 'D', 'i', 2], ['V', 'R', 'D', 'i', -1], ['X', 'Y', 'D', 'b', nan], ['X', 'Y', 'A', 'c', 1], ['X', 'Y', 'A', 'c', 4], ['W', 'Q', 'B', 'f', 3], ['W', 'Q', 'B', 'f', 8], ['W', 'R', 'C', 'g', 0], ['V', 'Y', 'C', 'j', 7], ['X', 'Y', 'B', 'd', 5]], columns=['cola', 'colb', 'colc', 'tag', 'val'], index=[3, 3, 2, 0, 1, 1, 7, 6, 6, 4, 4, 5, 9, 8]) tm.assert_frame_equal(result, expected) result = left.join(right, on=['cola', 'colb', 'colc'], how='left', sort=True) expected = expected.sort_values(['cola', 'colb', 'colc'], kind='mergesort') tm.assert_frame_equal(result, expected) def test_left_join_index_multi_match(self): left = DataFrame([ ['c', 0], ['b', 1], ['a', 2], ['b', 3]], columns=['tag', 'val'], index=[2, 0, 1, 3]) right = (DataFrame([ ['a', 'v'], ['c', 'w'], ['c', 'x'], ['d', 'y'], ['a', 'z'], ['c', 'r'], ['e', 'q'], ['c', 's']], columns=['tag', 'char']) .set_index('tag')) result = left.join(right, on='tag', how='left') expected = DataFrame([ ['c', 0, 'w'], ['c', 0, 'x'], ['c', 0, 'r'], ['c', 0, 's'], ['b', 1, nan], ['a', 2, 'v'], ['a', 2, 'z'], ['b', 3, nan]], columns=['tag', 'val', 'char'], index=[2, 2, 2, 2, 0, 1, 1, 3]) tm.assert_frame_equal(result, expected) result = left.join(right, on='tag', how='left', sort=True) expected2 = expected.sort_values('tag', kind='mergesort') tm.assert_frame_equal(result, expected2) # GH7331 - maintain left frame order in left merge result = merge(left, right.reset_index(), how='left', on='tag') expected.index = np.arange(len(expected)) tm.assert_frame_equal(result, expected) def test_left_merge_na_buglet(self): left = DataFrame({'id': list('abcde'), 'v1': randn(5), 'v2': randn(5), 'dummy': list('abcde'), 'v3': randn(5)}, columns=['id', 'v1', 'v2', 'dummy', 'v3']) right = DataFrame({'id': ['a', 'b', np.nan, np.nan, np.nan], 'sv3': [1.234, 5.678, np.nan, np.nan, np.nan]}) result = merge(left, right, on='id', how='left') rdf = right.drop(['id'], axis=1) expected = left.join(rdf) tm.assert_frame_equal(result, expected) def test_merge_na_keys(self): data = [[1950, "A", 1.5], [1950, "B", 1.5], [1955, "B", 1.5], [1960, "B", np.nan], [1970, "B", 4.], [1950, "C", 4.], [1960, "C", np.nan], [1965, "C", 3.], [1970, "C", 4.]] frame = DataFrame(data, columns=["year", "panel", "data"]) other_data = [[1960, 'A', np.nan], [1970, 'A', np.nan], [1955, 'A', np.nan], [1965, 'A', np.nan], [1965, 'B', np.nan], [1955, 'C', np.nan]] other = DataFrame(other_data, columns=['year', 'panel', 'data']) result = frame.merge(other, how='outer') expected = frame.fillna(-999).merge(other.fillna(-999), how='outer') expected = expected.replace(-999, np.nan) tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("klass", [None, np.asarray, Series, Index]) def test_merge_datetime_index(self, klass): # see gh-19038 df = DataFrame([1, 2, 3], ["2016-01-01", "2017-01-01", "2018-01-01"], columns=["a"]) df.index = pd.to_datetime(df.index) on_vector = df.index.year if klass is not None: on_vector = klass(on_vector) expected = DataFrame( OrderedDict([ ("a", [1, 2, 3]), ("key_1", [2016, 2017, 2018]), ]) ) result = df.merge(df, on=["a", on_vector], how="inner") tm.assert_frame_equal(result, expected) expected = DataFrame( OrderedDict([ ("key_0", [2016, 2017, 2018]), ("a_x", [1, 2, 3]), ("a_y", [1, 2, 3]), ]) ) result = df.merge(df, on=[df.index.year], how="inner") tm.assert_frame_equal(result, expected) def test_join_multi_levels(self): # GH 3662 # merge multi-levels household = ( DataFrame( dict(household_id=[1, 2, 3], male=[0, 1, 0], wealth=[196087.3, 316478.7, 294750]), columns=['household_id', 'male', 'wealth']) .set_index('household_id')) portfolio = ( DataFrame( dict(household_id=[1, 2, 2, 3, 3, 3, 4], asset_id=["nl0000301109", "nl0000289783", "gb00b03mlx29", "gb00b03mlx29", "lu0197800237", "nl0000289965", np.nan], name=["ABN Amro", "Robeco", "Royal Dutch Shell", "Royal Dutch Shell", "AAB Eastern Europe Equity Fund", "Postbank BioTech Fonds", np.nan], share=[1.0, 0.4, 0.6, 0.15, 0.6, 0.25, 1.0]), columns=['household_id', 'asset_id', 'name', 'share']) .set_index(['household_id', 'asset_id'])) result = household.join(portfolio, how='inner') expected = ( DataFrame( dict(male=[0, 1, 1, 0, 0, 0], wealth=[196087.3, 316478.7, 316478.7, 294750.0, 294750.0, 294750.0], name=['ABN Amro', 'Robeco', 'Royal Dutch Shell', 'Royal Dutch Shell', 'AAB Eastern Europe Equity Fund', 'Postbank BioTech Fonds'], share=[1.00, 0.40, 0.60, 0.15, 0.60, 0.25], household_id=[1, 2, 2, 3, 3, 3], asset_id=['nl0000301109', 'nl0000289783', 'gb00b03mlx29', 'gb00b03mlx29', 'lu0197800237', 'nl0000289965'])) .set_index(['household_id', 'asset_id']) .reindex(columns=['male', 'wealth', 'name', 'share'])) tm.assert_frame_equal(result, expected) # equivalency result = (merge(household.reset_index(), portfolio.reset_index(), on=['household_id'], how='inner') .set_index(['household_id', 'asset_id'])) tm.assert_frame_equal(result, expected) result = household.join(portfolio, how='outer') expected = (concat([ expected, (DataFrame( dict(share=[1.00]), index=MultiIndex.from_tuples( [(4, np.nan)], names=['household_id', 'asset_id']))) ], axis=0, sort=True).reindex(columns=expected.columns)) tm.assert_frame_equal(result, expected) # invalid cases household.index.name = 'foo' with pytest.raises(ValueError): household.join(portfolio, how='inner') portfolio2 = portfolio.copy() portfolio2.index.set_names(['household_id', 'foo']) with pytest.raises(ValueError): portfolio2.join(portfolio, how='inner') def test_join_multi_levels2(self): # some more advanced merges # GH6360 household = ( DataFrame( dict(household_id=[1, 2, 2, 3, 3, 3, 4], asset_id=["nl0000301109", "nl0000301109", "gb00b03mlx29", "gb00b03mlx29", "lu0197800237", "nl0000289965", np.nan], share=[1.0, 0.4, 0.6, 0.15, 0.6, 0.25, 1.0]), columns=['household_id', 'asset_id', 'share']) .set_index(['household_id', 'asset_id'])) log_return = DataFrame(dict( asset_id=["gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "lu0197800237", "lu0197800237"], t=[233, 234, 235, 180, 181], log_return=[.09604978, -.06524096, .03532373, .03025441, .036997] )).set_index(["asset_id", "t"]) expected = ( DataFrame(dict( household_id=[2, 2, 2, 3, 3, 3, 3, 3], asset_id=["gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "lu0197800237", "lu0197800237"], t=[233, 234, 235, 233, 234, 235, 180, 181], share=[0.6, 0.6, 0.6, 0.15, 0.15, 0.15, 0.6, 0.6], log_return=[.09604978, -.06524096, .03532373, .09604978, -.06524096, .03532373, .03025441, .036997] )) .set_index(["household_id", "asset_id", "t"]) .reindex(columns=['share', 'log_return'])) # this is the equivalency result = (merge(household.reset_index(), log_return.reset_index(), on=['asset_id'], how='inner') .set_index(['household_id', 'asset_id', 't'])) tm.assert_frame_equal(result, expected) expected = ( DataFrame(dict( household_id=[1, 2, 2, 2, 2, 3, 3, 3, 3, 3, 3, 4], asset_id=["nl0000301109", "nl0000301109", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "gb00b03mlx29", "lu0197800237", "lu0197800237", "nl0000289965", None], t=[None, None, 233, 234, 235, 233, 234, 235, 180, 181, None, None], share=[1.0, 0.4, 0.6, 0.6, 0.6, 0.15, 0.15, 0.15, 0.6, 0.6, 0.25, 1.0], log_return=[None, None, .09604978, -.06524096, .03532373, .09604978, -.06524096, .03532373, .03025441, .036997, None, None] )) .set_index(["household_id", "asset_id", "t"]) .reindex(columns=['share', 'log_return'])) result = (merge(household.reset_index(), log_return.reset_index(), on=['asset_id'], how='outer') .set_index(['household_id', 'asset_id', 't'])) tm.assert_frame_equal(result, expected) class TestJoinMultiMulti(object): def test_join_multi_multi(self, left_multi, right_multi, join_type, on_cols_multi, idx_cols_multi): # Multi-index join tests expected = (pd.merge(left_multi.reset_index(), right_multi.reset_index(), how=join_type, on=on_cols_multi). set_index(idx_cols_multi).sort_index()) result = left_multi.join(right_multi, how=join_type).sort_index() tm.assert_frame_equal(result, expected) def test_join_multi_empty_frames(self, left_multi, right_multi, join_type, on_cols_multi, idx_cols_multi): left_multi = left_multi.drop(columns=left_multi.columns) right_multi = right_multi.drop(columns=right_multi.columns) expected = (pd.merge(left_multi.reset_index(), right_multi.reset_index(), how=join_type, on=on_cols_multi) .set_index(idx_cols_multi).sort_index()) result = left_multi.join(right_multi, how=join_type).sort_index() tm.assert_frame_equal(result, expected) @pytest.mark.parametrize("box", [None, np.asarray, Series, Index]) def test_merge_datetime_index(self, box): # see gh-19038 df = DataFrame([1, 2, 3], ["2016-01-01", "2017-01-01", "2018-01-01"], columns=["a"]) df.index = pd.to_datetime(df.index) on_vector = df.index.year if box is not None: on_vector = box(on_vector) expected = DataFrame( OrderedDict([ ("a", [1, 2, 3]), ("key_1", [2016, 2017, 2018]), ]) ) result = df.merge(df, on=["a", on_vector], how="inner") tm.assert_frame_equal(result, expected) expected = DataFrame( OrderedDict([ ("key_0", [2016, 2017, 2018]), ("a_x", [1, 2, 3]), ("a_y", [1, 2, 3]), ]) ) result = df.merge(df, on=[df.index.year], how="inner") tm.assert_frame_equal(result, expected) def test_single_common_level(self): index_left = pd.MultiIndex.from_tuples([('K0', 'X0'), ('K0', 'X1'), ('K1', 'X2')], names=['key', 'X']) left = pd.DataFrame({'A': ['A0', 'A1', 'A2'], 'B': ['B0', 'B1', 'B2']}, index=index_left) index_right = pd.MultiIndex.from_tuples([('K0', 'Y0'), ('K1', 'Y1'), ('K2', 'Y2'), ('K2', 'Y3')], names=['key', 'Y']) right = pd.DataFrame({'C': ['C0', 'C1', 'C2', 'C3'], 'D': ['D0', 'D1', 'D2', 'D3']}, index=index_right) result = left.join(right) expected = (pd.merge(left.reset_index(), right.reset_index(), on=['key'], how='inner') .set_index(['key', 'X', 'Y'])) tm.assert_frame_equal(result, expected)