Skip to content

Commit

Permalink
FIX-#7250: Revert "PERF-#6666: Avoid internal reset_index for left me…
Browse files Browse the repository at this point in the history
…rge" (#7251)

Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev authored May 13, 2024
1 parent 78dd171 commit 1f06e70
Show file tree
Hide file tree
Showing 4 changed files with 42 additions and 45 deletions.
27 changes: 3 additions & 24 deletions modin/core/storage_formats/pandas/merge.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,23 +144,8 @@ def should_keep_index(left, right):
)
return keep_index

def map_func(
left, right, *axis_lengths, kwargs=kwargs, **service_kwargs
): # pragma: no cover
df = pandas.merge(left, right, **kwargs)

if kwargs["how"] == "left":
partition_idx = service_kwargs["partition_idx"]
if len(axis_lengths):
if not should_keep_index(left, right):
# Doesn't work for "inner" case, since the partition sizes of the
# left dataframe may change
start = sum(axis_lengths[:partition_idx])
stop = sum(axis_lengths[: partition_idx + 1])

df.index = pandas.RangeIndex(start, stop)

return df
def map_func(left, right): # pragma: no cover
return pandas.merge(left, right, **kwargs)

# Want to ensure that these are python lists
if left_on is not None and right_on is not None:
Expand Down Expand Up @@ -188,7 +173,6 @@ def map_func(
left._modin_frame.broadcast_apply_full_axis(
axis=1,
func=map_func,
enumerate_partitions=how == "left",
other=right_to_broadcast,
# We're going to explicitly change the shape across the 1-axis,
# so we want for partitioning to adapt as well
Expand All @@ -199,7 +183,6 @@ def map_func(
new_columns=new_columns,
sync_labels=False,
dtypes=new_dtypes,
pass_axis_lengths_to_partitions=how == "left",
)
)

Expand Down Expand Up @@ -238,11 +221,7 @@ def map_func(
else new_left.sort_rows_by_column_values(on)
)

return (
new_left.reset_index(drop=True)
if not keep_index and (kwargs["how"] != "left" or sort)
else new_left
)
return new_left if keep_index else new_left.reset_index(drop=True)
else:
return left.default_to_pandas(pandas.DataFrame.merge, right, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1994,8 +1994,6 @@ def sort_rows(self, columns, ascending, ignore_index, na_position):
drop_index_cols_after = [
col for col in base._index_cols if col in columns
]
if not drop_index_cols_after:
drop_index_cols_after = None

if drop_index_cols_before:
exprs = dict()
Expand Down
54 changes: 37 additions & 17 deletions modin/tests/pandas/dataframe/test_join_sort.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,20 +230,20 @@ def test_join_6602():
"test_data, test_data2",
[
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.randint(0, 100, size=(64, 64)),
np.random.randint(0, 100, size=(128, 64)),
),
(
np.random.uniform(0, 100, size=(2**7, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(128, 64)),
np.random.randint(0, 100, size=(64, 64)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.randint(0, 100, size=(64, 64)),
np.random.randint(0, 100, size=(64, 128)),
),
(
np.random.uniform(0, 100, size=(2**6, 2**7)),
np.random.uniform(0, 100, size=(2**6, 2**6)),
np.random.randint(0, 100, size=(64, 128)),
np.random.randint(0, 100, size=(64, 64)),
),
],
)
Expand Down Expand Up @@ -280,7 +280,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=hows[i], on=ons[j], sort=sorts[j]
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

modin_result = modin_df.merge(
modin_df2,
Expand All @@ -296,7 +298,9 @@ def test_merge(test_data, test_data2):
right_on="key",
sort=sorts[j],
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# Test for issue #1771
modin_df = pd.DataFrame({"name": np.arange(40)})
Expand All @@ -305,7 +309,9 @@ def test_merge(test_data, test_data2):
pandas_df2 = pandas.DataFrame({"name": [39], "position": [0]})
modin_result = modin_df.merge(modin_df2, on="name", how="inner")
pandas_result = pandas_df.merge(pandas_df2, on="name", how="inner")
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

frame_data = {
"col1": [0, 1, 2, 3],
Expand All @@ -326,7 +332,9 @@ def test_merge(test_data, test_data2):
# Defaults
modin_result = modin_df.merge(modin_df2, how=how)
pandas_result = pandas_df.merge(pandas_df2, how=how)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# left_on and right_index
modin_result = modin_df.merge(
Expand All @@ -335,7 +343,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_index=True
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# left_index and right_on
modin_result = modin_df.merge(
Expand All @@ -344,7 +354,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_on="col1"
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# left_on and right_on col1
modin_result = modin_df.merge(
Expand All @@ -353,7 +365,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col1", right_on="col1"
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# left_on and right_on col2
modin_result = modin_df.merge(
Expand All @@ -362,7 +376,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_on="col2", right_on="col2"
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# left_index and right_index
modin_result = modin_df.merge(
Expand All @@ -371,7 +387,9 @@ def test_merge(test_data, test_data2):
pandas_result = pandas_df.merge(
pandas_df2, how=how, left_index=True, right_index=True
)
sort_if_range_partitioning(modin_result, pandas_result)
sort_if_range_partitioning(
modin_result, pandas_result, force=StorageFormat.get() == "Hdk"
)

# Cannot merge a Series without a name
ps = pandas.Series(frame_data2.get("col1"))
Expand All @@ -382,6 +400,7 @@ def test_merge(test_data, test_data2):
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=sort_if_range_partitioning,
expected_exception=ValueError("Cannot merge a Series without a name"),
comparator_kwargs={"force": StorageFormat.get() == "Hdk"},
)

# merge a Series with a name
Expand All @@ -392,6 +411,7 @@ def test_merge(test_data, test_data2):
pandas_df,
lambda df: df.merge(ms if isinstance(df, pd.DataFrame) else ps),
comparator=sort_if_range_partitioning,
comparator_kwargs={"force": StorageFormat.get() == "Hdk"},
)

with pytest.raises(TypeError):
Expand Down
4 changes: 2 additions & 2 deletions modin/tests/pandas/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -697,12 +697,12 @@ def sort_data(data):
return np.sort(data)


def sort_if_range_partitioning(df1, df2, comparator=None):
def sort_if_range_partitioning(df1, df2, comparator=None, force=False):
"""Sort the passed objects if 'RangePartitioning' is enabled and compare the sorted results."""
if comparator is None:
comparator = df_equals

if RangePartitioning.get() or use_range_partitioning_groupby():
if force or (RangePartitioning.get() or use_range_partitioning_groupby()):
df1, df2 = sort_data(df1), sort_data(df2)

comparator(df1, df2)
Expand Down

0 comments on commit 1f06e70

Please sign in to comment.