Skip to content

Commit

Permalink
FIX-modin-project#2133 modin-project#2265: Fix binary operations for …
Browse files Browse the repository at this point in the history
…modin frames in case

when partitioning isn't aligned

Signed-off-by: Alexey Prutskov <[email protected]>
  • Loading branch information
prutskov committed Oct 27, 2020
1 parent a571e10 commit 407ce7a
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 19 deletions.
3 changes: 2 additions & 1 deletion modin/data_management/functions/binary_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def call(cls, func, *call_args, **call_kwds):
def caller(query_compiler, other, *args, **kwargs):
axis = kwargs.get("axis", 0)
broadcast = kwargs.pop("broadcast", False)
join_type = call_kwds.get("join_type", "outer")
if isinstance(other, type(query_compiler)):
if broadcast:
assert (
Expand All @@ -39,11 +40,11 @@ def caller(query_compiler, other, *args, **kwargs):
axis,
lambda l, r: func(l, r.squeeze(), *args, **kwargs),
other._modin_frame,
join_type=join_type,
preserve_labels=call_kwds.get("preserve_labels", False),
)
)
else:
join_type = call_kwds.get("join_type", "outer")
return query_compiler.__constructor__(
query_compiler._modin_frame._binary_op(
lambda x, y: func(x, y, *args, **kwargs),
Expand Down
71 changes: 55 additions & 16 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,23 +1402,34 @@ def _apply_select_indices(
self._column_widths_cache,
)

def broadcast_apply(self, axis, func, other, preserve_labels=True, dtypes=None):
"""Broadcast partitions of other dataframe partitions and apply a function.
def broadcast_apply(
self, axis, func, other, join_type="left", preserve_labels=True, dtypes=None
):
"""
Broadcast partitions of other dataframe partitions and apply a function.
Args:
axis: The axis to broadcast over.
func: The function to apply.
other: The Modin DataFrame to broadcast.
preserve_labels: Whether or not to keep labels from this Modin DataFrame.
dtypes: "copy" or None. Whether to keep old dtypes or infer new dtypes from
data.
Parameters
----------
axis: int,
The axis to broadcast over.
func: callable,
The function to apply.
other: BasePandasFrame
The Modin DataFrame to broadcast.
join_type: str (optional)
The type of join to apply.
preserve_labels: boolean (optional)
Whether or not to keep labels from this Modin DataFrame.
dtypes: "copy" or None (optional)
Whether to keep old dtypes or infer new dtypes from data.
Returns:
A new Modin DataFrame
Returns
-------
BasePandasFrame
"""
# Only sort the indices if they do not match
left_parts, right_parts, joined_index = self._copartition(
axis, other, "left", sort=not self.axes[axis].equals(other.axes[axis])
axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis])
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
Expand Down Expand Up @@ -1651,7 +1662,26 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
"""
if isinstance(other, type(self)):
other = [other]
if all(o.axes[axis].equals(self.axes[axis]) for o in other):

is_aligning_applied = False
for i in range(len(other)):
if (
len(self._partitions) != len(other[i]._partitions)
and len(self.axes[0]) == len(other[i].axes[0])
and axis == 0
):
is_aligning_applied = True
self._partitions = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df
)
other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions(
axis, other[i]._partitions, lambda df: df
)

if (
all(o.axes[axis].equals(self.axes[axis]) for o in other)
and not is_aligning_applied
):
return (
self._partitions,
[self._simple_shuffle(axis, o) for o in other],
Expand All @@ -1664,8 +1694,13 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0
# Start with this and we'll repartition the first time, and then not again.
if not left_old_idx.equals(joined_index) or force_repartition:
if (
not is_aligning_applied
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
):
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
)
Expand All @@ -1674,7 +1709,11 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
reindexed_other_list = []

for i in range(len(other)):
if right_old_idxes[i].equals(joined_index) and not force_repartition:
if (
is_aligning_applied
or is_avoid_reindex
or (not force_repartition and right_old_idxes[i].equals(joined_index))
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
Expand Down Expand Up @@ -1744,7 +1783,7 @@ def _binary_op(self, op, right_frame, join_type="outer"):
1, left_parts, lambda l, r: op(l, r), right_parts
)
new_columns = self.columns.join(right_frame.columns, how=join_type)
return self.__constructor__(new_frame, self.index, new_columns, None, None)
return self.__constructor__(new_frame, joined_index, new_columns, None, None)

def _concat(self, axis, others, how, sort):
"""Concatenate this dataframe with one or more others.
Expand Down
6 changes: 4 additions & 2 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,9 +1637,11 @@ def _prepare_inter_op(self, other):
"""
if isinstance(other, Series):
new_self = self.copy()
new_self.name = "__reduced__"
new_other = other.copy()
new_other.name = "__reduced__"
if self.name == other.name:
new_self.name = new_other.name = self.name
else:
new_self.name = new_other.name = "__reduced__"
else:
new_self = self
new_other = other
Expand Down
59 changes: 59 additions & 0 deletions modin/pandas/test/dataframe/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,62 @@ def test_equals():
df_equals(modin_df3, modin_df2)

assert modin_df1.equals(modin_df2._query_compiler.to_pandas())


@pytest.mark.parametrize("is_more_other_partitions", [True, False])
@pytest.mark.parametrize(
"op_type", ["df_ser", "df_df", "ser_ser_same_name", "ser_ser_different_name"]
)
@pytest.mark.parametrize(
"is_idx_aligned", [True, False], ids=["idx_aligned", "idx_not_aligned"]
)
def test_mismatched_row_partitions(is_idx_aligned, op_type, is_more_other_partitions):
data = [0, 1, 2, 3, 4, 5]
modin_df1, pandas_df1 = create_test_dfs({"a": data, "b": data})
modin_df, pandas_df = modin_df1.loc[:2], pandas_df1.loc[:2]

modin_df2 = modin_df.append(modin_df)
pandas_df2 = pandas_df.append(pandas_df)
if is_more_other_partitions:
modin_df2, modin_df1 = modin_df1, modin_df2
pandas_df2, pandas_df1 = pandas_df1, pandas_df2

if is_idx_aligned:
if is_more_other_partitions:
modin_df1.index = pandas_df1.index = pandas_df2.index
else:
modin_df2.index = pandas_df2.index = pandas_df1.index

# Pandas don't support this case because result will contain duplicate values by col axis.
if op_type == "df_ser" and not is_idx_aligned and is_more_other_partitions:
eval_general(
modin_df2,
pandas_df2,
lambda df: df / modin_df1.a
if isinstance(df, pd.DataFrame)
else df / pandas_df1.a,
)
return

if op_type == "df_ser":
modin_res = modin_df2 / modin_df1.a
pandas_res = pandas_df2 / pandas_df1.a
elif op_type == "df_df":
modin_res = modin_df2 / modin_df1
pandas_res = pandas_df2 / pandas_df1
elif op_type == "ser_ser_same_name":
modin_res = modin_df2.a / modin_df1.a
pandas_res = pandas_df2.a / pandas_df1.a
elif op_type == "ser_ser_different_name":
modin_res = modin_df2.a / modin_df1.b
pandas_res = pandas_df2.a / pandas_df1.b
df_equals(modin_res, pandas_res)


def test_duplicate_indexes():
data = [0, 1, 2, 3, 4, 5]
modin_df1, pandas_df1 = create_test_dfs(
{"a": data, "b": data}, index=[0, 1, 2, 0, 1, 2]
)
modin_df2, pandas_df2 = create_test_dfs({"a": data, "b": data})
df_equals(modin_df1 / modin_df2, pandas_df1 / pandas_df2)

0 comments on commit 407ce7a

Please sign in to comment.