Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FIX-#2133 #2265: Fix binary operations in case when partitioning isn't aligned #2256

Merged
merged 1 commit into from
Oct 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion modin/data_management/functions/binary_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ def call(cls, func, *call_args, **call_kwds):
def caller(query_compiler, other, *args, **kwargs):
axis = kwargs.get("axis", 0)
broadcast = kwargs.pop("broadcast", False)
join_type = call_kwds.get("join_type", "outer")
if isinstance(other, type(query_compiler)):
if broadcast:
assert (
Expand All @@ -39,11 +40,11 @@ def caller(query_compiler, other, *args, **kwargs):
axis,
lambda l, r: func(l, r.squeeze(), *args, **kwargs),
other._modin_frame,
join_type=join_type,
preserve_labels=call_kwds.get("preserve_labels", False),
)
)
else:
join_type = call_kwds.get("join_type", "outer")
return query_compiler.__constructor__(
query_compiler._modin_frame._binary_op(
lambda x, y: func(x, y, *args, **kwargs),
Expand Down
71 changes: 55 additions & 16 deletions modin/engines/base/frame/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -1402,23 +1402,34 @@ def _apply_select_indices(
self._column_widths_cache,
)

def broadcast_apply(self, axis, func, other, preserve_labels=True, dtypes=None):
"""Broadcast partitions of other dataframe partitions and apply a function.
def broadcast_apply(
self, axis, func, other, join_type="left", preserve_labels=True, dtypes=None
):
"""
Broadcast partitions of other dataframe partitions and apply a function.

Args:
axis: The axis to broadcast over.
func: The function to apply.
other: The Modin DataFrame to broadcast.
preserve_labels: Whether or not to keep labels from this Modin DataFrame.
dtypes: "copy" or None. Whether to keep old dtypes or infer new dtypes from
data.
Parameters
----------
axis: int,
The axis to broadcast over.
func: callable,
The function to apply.
other: BasePandasFrame
The Modin DataFrame to broadcast.
join_type: str (optional)
The type of join to apply.
preserve_labels: boolean (optional)
Whether or not to keep labels from this Modin DataFrame.
dtypes: "copy" or None (optional)
Whether to keep old dtypes or infer new dtypes from data.

Returns:
A new Modin DataFrame
Returns
-------
BasePandasFrame
"""
# Only sort the indices if they do not match
left_parts, right_parts, joined_index = self._copartition(
axis, other, "left", sort=not self.axes[axis].equals(other.axes[axis])
axis, other, join_type, sort=not self.axes[axis].equals(other.axes[axis])
YarShev marked this conversation as resolved.
Show resolved Hide resolved
)
# unwrap list returned by `copartition`.
right_parts = right_parts[0]
Expand Down Expand Up @@ -1651,7 +1662,26 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
"""
if isinstance(other, type(self)):
other = [other]
if all(o.axes[axis].equals(self.axes[axis]) for o in other):

is_aligning_applied = False
for i in range(len(other)):
if (
len(self._partitions) != len(other[i]._partitions)
and len(self.axes[0]) == len(other[i].axes[0])
and axis == 0
):
is_aligning_applied = True
self._partitions = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df
)
other[i]._partitions = other[i]._frame_mgr_cls.map_axis_partitions(
axis, other[i]._partitions, lambda df: df
)

if (
all(o.axes[axis].equals(self.axes[axis]) for o in other)
and not is_aligning_applied
):
return (
self._partitions,
[self._simple_shuffle(axis, o) for o in other],
Expand All @@ -1664,8 +1694,13 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
left_old_idx = self.axes[axis]
right_old_idxes = index_other_obj

is_avoid_reindex = len(joined_index) != len(joined_index.unique()) and axis == 0
# Start with this and we'll repartition the first time, and then not again.
if not left_old_idx.equals(joined_index) or force_repartition:
if (
not is_aligning_applied
and not is_avoid_reindex
and (force_repartition or not left_old_idx.equals(joined_index))
):
reindexed_self = self._frame_mgr_cls.map_axis_partitions(
axis, self._partitions, lambda df: df.reindex(joined_index, axis=axis)
)
Expand All @@ -1674,7 +1709,11 @@ def _copartition(self, axis, other, how, sort, force_repartition=False):
reindexed_other_list = []

for i in range(len(other)):
if right_old_idxes[i].equals(joined_index) and not force_repartition:
if (
is_aligning_applied
or is_avoid_reindex
or (not force_repartition and right_old_idxes[i].equals(joined_index))
):
reindexed_other = other[i]._partitions
else:
reindexed_other = other[i]._frame_mgr_cls.map_axis_partitions(
Expand Down Expand Up @@ -1744,7 +1783,7 @@ def _binary_op(self, op, right_frame, join_type="outer"):
1, left_parts, lambda l, r: op(l, r), right_parts
)
new_columns = self.columns.join(right_frame.columns, how=join_type)
return self.__constructor__(new_frame, self.index, new_columns, None, None)
return self.__constructor__(new_frame, joined_index, new_columns, None, None)

def _concat(self, axis, others, how, sort):
"""Concatenate this dataframe with one or more others.
Expand Down
6 changes: 4 additions & 2 deletions modin/pandas/series.py
Original file line number Diff line number Diff line change
Expand Up @@ -1637,9 +1637,11 @@ def _prepare_inter_op(self, other):
"""
if isinstance(other, Series):
new_self = self.copy()
new_self.name = "__reduced__"
new_other = other.copy()
new_other.name = "__reduced__"
if self.name == other.name:
YarShev marked this conversation as resolved.
Show resolved Hide resolved
new_self.name = new_other.name = self.name
else:
new_self.name = new_other.name = "__reduced__"
else:
new_self = self
new_other = other
Expand Down
59 changes: 59 additions & 0 deletions modin/pandas/test/dataframe/test_binary.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,62 @@ def test_equals():
df_equals(modin_df3, modin_df2)

assert modin_df1.equals(modin_df2._query_compiler.to_pandas())


@pytest.mark.parametrize("is_more_other_partitions", [True, False])
@pytest.mark.parametrize(
"op_type", ["df_ser", "df_df", "ser_ser_same_name", "ser_ser_different_name"]
)
@pytest.mark.parametrize(
"is_idx_aligned", [True, False], ids=["idx_aligned", "idx_not_aligned"]
)
def test_mismatched_row_partitions(is_idx_aligned, op_type, is_more_other_partitions):
data = [0, 1, 2, 3, 4, 5]
modin_df1, pandas_df1 = create_test_dfs({"a": data, "b": data})
modin_df, pandas_df = modin_df1.loc[:2], pandas_df1.loc[:2]

modin_df2 = modin_df.append(modin_df)
pandas_df2 = pandas_df.append(pandas_df)
if is_more_other_partitions:
modin_df2, modin_df1 = modin_df1, modin_df2
pandas_df2, pandas_df1 = pandas_df1, pandas_df2

if is_idx_aligned:
if is_more_other_partitions:
modin_df1.index = pandas_df1.index = pandas_df2.index
else:
modin_df2.index = pandas_df2.index = pandas_df1.index

# Pandas don't support this case because result will contain duplicate values by col axis.
if op_type == "df_ser" and not is_idx_aligned and is_more_other_partitions:
eval_general(
modin_df2,
pandas_df2,
lambda df: df / modin_df1.a
if isinstance(df, pd.DataFrame)
else df / pandas_df1.a,
)
return

if op_type == "df_ser":
modin_res = modin_df2 / modin_df1.a
pandas_res = pandas_df2 / pandas_df1.a
elif op_type == "df_df":
modin_res = modin_df2 / modin_df1
pandas_res = pandas_df2 / pandas_df1
elif op_type == "ser_ser_same_name":
modin_res = modin_df2.a / modin_df1.a
pandas_res = pandas_df2.a / pandas_df1.a
elif op_type == "ser_ser_different_name":
modin_res = modin_df2.a / modin_df1.b
pandas_res = pandas_df2.a / pandas_df1.b
df_equals(modin_res, pandas_res)


def test_duplicate_indexes():
data = [0, 1, 2, 3, 4, 5]
modin_df1, pandas_df1 = create_test_dfs(
{"a": data, "b": data}, index=[0, 1, 2, 0, 1, 2]
)
modin_df2, pandas_df2 = create_test_dfs({"a": data, "b": data})
df_equals(modin_df1 / modin_df2, pandas_df1 / pandas_df2)