Skip to content

Commit

Permalink
PERF-#6464: Improve reshuffling for multi-column groupby in low-cardi…
Browse files Browse the repository at this point in the history
…nality cases (#6533)

Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev authored Sep 8, 2023
1 parent f4abfc8 commit 0e42667
Show file tree
Hide file tree
Showing 4 changed files with 466 additions and 343 deletions.
87 changes: 40 additions & 47 deletions modin/core/dataframe/pandas/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
from pandas._libs.lib import no_default
from typing import List, Hashable, Optional, Callable, Union, Dict, TYPE_CHECKING

from modin.config import Engine, IsRayCluster
from modin.config import Engine, IsRayCluster, NPartitions
from modin.core.storage_formats.pandas.query_compiler import PandasQueryCompiler
from modin.core.storage_formats.pandas.utils import get_length_list
from modin.error_message import ErrorMessage
Expand All @@ -43,7 +43,7 @@
JoinType,
)
from modin.core.dataframe.pandas.dataframe.utils import (
build_sort_functions,
ShuffleSortFunctions,
lazy_metadata_decorator,
)
from modin.core.dataframe.pandas.metadata import (
Expand Down Expand Up @@ -2385,15 +2385,15 @@ def combine_and_apply(
)

def _apply_func_to_range_partitioning(
self, key_column, func, ascending=True, **kwargs
self, key_columns, func, ascending=True, **kwargs
):
"""
Reshuffle data so it would be range partitioned and then apply the passed function row-wise.
Parameters
----------
key_column : hashable
Column name to build the range partitioning for.
key_columns : list of hashables
Columns to build the range partitioning for.
func : callable(pandas.DataFrame) -> pandas.DataFrame
Function to apply against partitions.
ascending : bool, default: True
Expand All @@ -2410,7 +2410,8 @@ def _apply_func_to_range_partitioning(
if self._partitions.shape[0] == 1:
return self.apply_full_axis(axis=1, func=func)

ideal_num_new_partitions = len(self._partitions)
# don't want to inherit over-partitioning so doing this 'min' check
ideal_num_new_partitions = min(len(self._partitions), NPartitions.get())
m = len(self.index) / ideal_num_new_partitions
sampling_probability = (1 / m) * np.log(
ideal_num_new_partitions * len(self.index)
Expand All @@ -2431,56 +2432,48 @@ def _apply_func_to_range_partitioning(
# simply combine all partitions and apply the sorting to the whole dataframe
return self.combine_and_apply(func=func)

if is_numeric_dtype(self.dtypes[key_column]):
method = "linear"
if ideal_num_new_partitions < len(self._partitions):
if len(self._partitions) % ideal_num_new_partitions == 0:
joining_partitions = np.split(
self._partitions, ideal_num_new_partitions
)
else:
step = round(len(self._partitions) / ideal_num_new_partitions)
joining_partitions = np.split(
self._partitions,
range(step, len(self._partitions), step),
)

new_partitions = np.array(
[
self._partition_mgr_cls.column_partitions(
ptn_grp, full_axis=False
)
for ptn_grp in joining_partitions
]
)
else:
new_partitions = self._partitions
else:
# This means we are not sorting numbers, so we need our quantiles to not try
# arithmetic on the values.
method = "inverted_cdf"
new_partitions = self._partitions

shuffling_functions = build_sort_functions(
shuffling_functions = ShuffleSortFunctions(
self,
key_column,
method,
key_columns,
ascending[0] if is_list_like(ascending) else ascending,
ideal_num_new_partitions,
**kwargs,
)
if ideal_num_new_partitions < len(self._partitions):
if len(self._partitions) % ideal_num_new_partitions == 0:
joining_partitions = np.split(
self._partitions, ideal_num_new_partitions
)
else:
joining_partitions = np.split(
self._partitions,
range(
0,
len(self._partitions),
round(len(self._partitions) / ideal_num_new_partitions),
)[1:],
)

new_partitions = np.array(
[
self._partition_mgr_cls.column_partitions(ptn_grp, full_axis=False)
for ptn_grp in joining_partitions
]
)
else:
new_partitions = self._partitions
# here we want to get indices of those partitions that hold the key columns
key_indices = self.columns.get_indexer_for(key_columns)
partition_indices = np.unique(
np.digitize(key_indices, np.cumsum(self.column_widths))
)

major_col_partition_index = self.columns.get_loc(key_column)
cols_seen = 0
index = -1
for i, length in enumerate(self.column_widths):
cols_seen += length
if major_col_partition_index < cols_seen:
index = i
break
new_partitions = self._partition_mgr_cls.shuffle_partitions(
new_partitions,
index,
partition_indices,
shuffling_functions,
func,
)
Expand Down Expand Up @@ -2542,7 +2535,7 @@ def sort_function(df): # pragma: no cover
)

result = self._apply_func_to_range_partitioning(
key_column=columns[0], func=sort_function, ascending=ascending, **kwargs
key_columns=[columns[0]], func=sort_function, ascending=ascending, **kwargs
)

result.set_axis_cache(self.copy_axis_cache(axis.value ^ 1), axis=axis.value ^ 1)
Expand Down Expand Up @@ -3754,7 +3747,7 @@ def apply_func(df): # pragma: no cover
return result

result = self._apply_func_to_range_partitioning(
key_column=by[0],
key_columns=by,
func=apply_func,
)

Expand Down
Loading

0 comments on commit 0e42667

Please sign in to comment.