From 407ce7ac03b6a2e097d9544295ca02df84a27775 Mon Sep 17 00:00:00 2001 From: Alexey Prutskov Date: Thu, 15 Oct 2020 16:06:38 +0300 Subject: [PATCH] FIX-#2133 #2265: Fix binary operations for modin frames in case when partitioning isn't aligned Signed-off-by: Alexey Prutskov --- .../functions/binary_function.py | 3 +- modin/engines/base/frame/data.py | 71 ++++++++++++++----- modin/pandas/series.py | 6 +- modin/pandas/test/dataframe/test_binary.py | 59 +++++++++++++++ 4 files changed, 120 insertions(+), 19 deletions(-) diff --git a/modin/data_management/functions/binary_function.py b/modin/data_management/functions/binary_function.py index b8edf8f608a..28539bf9add 100644 --- a/modin/data_management/functions/binary_function.py +++ b/modin/data_management/functions/binary_function.py @@ -23,6 +23,7 @@ def call(cls, func, *call_args, **call_kwds): def caller(query_compiler, other, *args, **kwargs): axis = kwargs.get("axis", 0) broadcast = kwargs.pop("broadcast", False) + join_type = call_kwds.get("join_type", "outer") if isinstance(other, type(query_compiler)): if broadcast: assert ( @@ -39,11 +40,11 @@ def caller(query_compiler, other, *args, **kwargs): axis, lambda l, r: func(l, r.squeeze(), *args, **kwargs), other._modin_frame, + join_type=join_type, preserve_labels=call_kwds.get("preserve_labels", False), ) ) else: - join_type = call_kwds.get("join_type", "outer") return query_compiler.__constructor__( query_compiler._modin_frame._binary_op( lambda x, y: func(x, y, *args, **kwargs), diff --git a/modin/engines/base/frame/data.py b/modin/engines/base/frame/data.py index ae2f0f935ba..c1b609bf3d5 100644 --- a/modin/engines/base/frame/data.py +++ b/modin/engines/base/frame/data.py @@ -1402,23 +1402,34 @@ def _apply_select_indices( self._column_widths_cache, ) - def broadcast_apply(self, axis, func, other, preserve_labels=True, dtypes=None): - """Broadcast partitions of other dataframe partitions and apply a function. + def broadcast_apply( + self, axis, func, other, join_type="left", preserve_labels=True, dtypes=None + ): + """ + Broadcast partitions of other dataframe partitions and apply a function. - Args: - axis: The axis to broadcast over. - func: The function to apply. - other: The Modin DataFrame to broadcast. - preserve_labels: Whether or not to keep labels from this Modin DataFrame. - dtypes: "copy" or None. Whether to keep old dtypes or infer new dtypes from - data. + Parameters + ---------- + axis: int, + The axis to broadcast over. + func: callable, + The function to apply. + other: BasePandasFrame + The Modin DataFrame to broadcast. + join_type: str (optional) + The type of join to apply. + preserve_labels: boolean (optional) + Whether or not to keep labels from this Modin DataFrame. + dtypes: "copy" or None (optional) + Whether to keep old dtypes or infer new dtypes from data. - Returns: - A new Modin DataFrame + Returns + ------- + BasePandasFrame """ # Only sort the indices if they do not match left_parts, right_parts, joined_index = self._copartition( - axis, other, "left", sort=not self.axes[axis].equals(other.axes[axis]) + axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis]) ) # unwrap list returned by `copartition`. right_parts = right_parts[0] @@ -1651,7 +1662,26 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): """ if isinstance(other, type(self)): other = [other] - if all(o.axes[axis].equals(self.axes[axis]) for o in other): + + is_aligning_applied = False + for i in range(len(other)): + if ( + len(self._partitions) != len(other[i]._partitions) + and len(self.axes[0]) == len(other[i].axes[0]) + and axis == 0 + ): + is_aligning_applied = True + self._partitions = self._frame_mgr_cls.map_axis_partitions( + axis, self._partitions, lambda df: df + ) + other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions( + axis, other[i]._partitions, lambda df: df + ) + + if ( + all(o.axes[axis].equals(self.axes[axis]) for o in other) + and not is_aligning_applied + ): return ( self._partitions, [self._simple_shuffle(axis, o) for o in other], @@ -1664,8 +1694,13 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): left_old_idx = self.axes[axis] right_old_idxes = index_other_obj + is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0 # Start with this and we'll repartition the first time, and then not again. - if not left_old_idx.equals(joined_index) or force_repartition: + if ( + not is_aligning_applied + and not is_avoid_reindex + and (force_repartition or not left_old_idx.equals(joined_index)) + ): reindexed_self = self._frame_mgr_cls.map_axis_partitions( axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis) ) @@ -1674,7 +1709,11 @@ def _copartition(self, axis, other, how, sort, force_repartition=False): reindexed_other_list = [] for i in range(len(other)): - if right_old_idxes[i].equals(joined_index) and not force_repartition: + if ( + is_aligning_applied + or is_avoid_reindex + or (not force_repartition and right_old_idxes[i].equals(joined_index)) + ): reindexed_other = other[i]._partitions else: reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions( @@ -1744,7 +1783,7 @@ def _binary_op(self, op, right_frame, join_type="outer"): 1, left_parts, lambda l, r: op(l, r), right_parts ) new_columns = self.columns.join(right_frame.columns, how=join_type) - return self.__constructor__(new_frame, self.index, new_columns, None, None) + return self.__constructor__(new_frame, joined_index, new_columns, None, None) def _concat(self, axis, others, how, sort): """Concatenate this dataframe with one or more others. diff --git a/modin/pandas/series.py b/modin/pandas/series.py index a7bb85cc4ac..6a1e11e4929 100644 --- a/modin/pandas/series.py +++ b/modin/pandas/series.py @@ -1637,9 +1637,11 @@ def _prepare_inter_op(self, other): """ if isinstance(other, Series): new_self = self.copy() - new_self.name = "__reduced__" new_other = other.copy() - new_other.name = "__reduced__" + if self.name == other.name: + new_self.name = new_other.name = self.name + else: + new_self.name = new_other.name = "__reduced__" else: new_self = self new_other = other diff --git a/modin/pandas/test/dataframe/test_binary.py b/modin/pandas/test/dataframe/test_binary.py index 20b14faecd5..5346c0e8932 100644 --- a/modin/pandas/test/dataframe/test_binary.py +++ b/modin/pandas/test/dataframe/test_binary.py @@ -197,3 +197,62 @@ def test_equals(): df_equals(modin_df3, modin_df2) assert modin_df1.equals(modin_df2._query_compiler.to_pandas()) + + +@pytest.mark.parametrize("is_more_other_partitions", [True, False]) +@pytest.mark.parametrize( + "op_type", ["df_ser", "df_df", "ser_ser_same_name", "ser_ser_different_name"] +) +@pytest.mark.parametrize( + "is_idx_aligned", [True, False], ids=["idx_aligned", "idx_not_aligned"] +) +def test_mismatched_row_partitions(is_idx_aligned, op_type, is_more_other_partitions): + data = [0, 1, 2, 3, 4, 5] + modin_df1, pandas_df1 = create_test_dfs({"a": data, "b": data}) + modin_df, pandas_df = modin_df1.loc[:2], pandas_df1.loc[:2] + + modin_df2 = modin_df.append(modin_df) + pandas_df2 = pandas_df.append(pandas_df) + if is_more_other_partitions: + modin_df2, modin_df1 = modin_df1, modin_df2 + pandas_df2, pandas_df1 = pandas_df1, pandas_df2 + + if is_idx_aligned: + if is_more_other_partitions: + modin_df1.index = pandas_df1.index = pandas_df2.index + else: + modin_df2.index = pandas_df2.index = pandas_df1.index + + # Pandas don't support this case because result will contain duplicate values by col axis. + if op_type == "df_ser" and not is_idx_aligned and is_more_other_partitions: + eval_general( + modin_df2, + pandas_df2, + lambda df: df / modin_df1.a + if isinstance(df, pd.DataFrame) + else df / pandas_df1.a, + ) + return + + if op_type == "df_ser": + modin_res = modin_df2 / modin_df1.a + pandas_res = pandas_df2 / pandas_df1.a + elif op_type == "df_df": + modin_res = modin_df2 / modin_df1 + pandas_res = pandas_df2 / pandas_df1 + elif op_type == "ser_ser_same_name": + modin_res = modin_df2.a / modin_df1.a + pandas_res = pandas_df2.a / pandas_df1.a + elif op_type == "ser_ser_different_name": + modin_res = modin_df2.a / modin_df1.b + pandas_res = pandas_df2.a / pandas_df1.b + df_equals(modin_res, pandas_res) + + +def test_duplicate_indexes(): + data = [0, 1, 2, 3, 4, 5] + modin_df1, pandas_df1 = create_test_dfs( + {"a": data, "b": data}, index=[0, 1, 2, 0, 1, 2] + ) + modin_df2, pandas_df2 = create_test_dfs({"a": data, "b": data}) + df_equals(modin_df1 / modin_df2, pandas_df1 / pandas_df2)