diff --git a/modin/core/dataframe/pandas/dataframe/dataframe.py b/modin/core/dataframe/pandas/dataframe/dataframe.py index df0e5c46e3a..4ad7fe38ad4 100644 --- a/modin/core/dataframe/pandas/dataframe/dataframe.py +++ b/modin/core/dataframe/pandas/dataframe/dataframe.py @@ -2446,6 +2446,7 @@ def _apply_func_to_range_partitioning( preserve_columns=False, data=None, data_key_columns=None, + level=None, **kwargs, ): """ @@ -2454,7 +2455,7 @@ def _apply_func_to_range_partitioning( Parameters ---------- key_columns : list of hashables - Columns to build the range partitioning for. + Columns to build the range partitioning for. Can't be specified along with `level`. func : callable(pandas.DataFrame) -> pandas.DataFrame Function to apply against partitions. ascending : bool, default: True @@ -2467,6 +2468,8 @@ def _apply_func_to_range_partitioning( ``df["grouper"] # self`` and ``df["data"] # data``. data_key_columns : list of hashables, optional Additional key columns from `data`. Will be combined with `key_columns`. + level : list of ints or labels, optional + Index level(s) to build the range partitioning for. Can't be specified along with `key_columns`. **kwargs : dict Additional arguments to forward to the range builder function. @@ -2575,14 +2578,21 @@ def _apply_func_to_range_partitioning( key_columns, ascending[0] if is_list_like(ascending) else ascending, ideal_num_new_partitions, + level=level, **kwargs, ) - # here we want to get indices of those partitions that hold the key columns - key_indices = grouper.columns.get_indexer_for(key_columns) - partition_indices = np.unique( - np.digitize(key_indices, np.cumsum(grouper.column_widths)) - ) + if key_columns: + # here we want to get indices of those partitions that hold the key columns + key_indices = grouper.columns.get_indexer_for(key_columns) + partition_indices = np.unique( + np.digitize(key_indices, np.cumsum(grouper.column_widths)) + ) + elif level is not None: + # each partition contains an index, so taking the first one + partition_indices = [0] + else: + raise ValueError("Must specify either 'level' or 'key_columns'") new_partitions = grouper._partition_mgr_cls.shuffle_partitions( new_partitions, @@ -4038,6 +4048,10 @@ def groupby( duplicated_suffix = "__duplicated_suffix__" duplicated_pattern = r"_[\d]*__duplicated_suffix__" kwargs["observed"] = True + level = kwargs.get("level") + + if level is not None and not isinstance(level, list): + level = [level] def apply_func(df): # pragma: no cover if has_external_grouper: @@ -4081,6 +4095,12 @@ def apply_func(df): # pragma: no cover if series_groupby: df = df.squeeze(axis=1) + + if kwargs.get("level") is not None: + assert len(by) == 0 + # passing an empty list triggers an error + by = None + result = operator(df.groupby(by, **kwargs)) if align_result_columns and df.empty and result.empty: @@ -4135,6 +4155,7 @@ def apply_func(df): # pragma: no cover func=apply_func, data=data, data_key_columns=data_key_columns, + level=level, ) # no need aligning columns if there's only one row partition if add_missing_cats or align_result_columns and result._partitions.shape[0] > 1: @@ -4293,7 +4314,11 @@ def join_cols(df, *cols): row_lengths=result._row_lengths_cache, ) - if not result.has_materialized_index and not has_external_grouper: + if ( + not result.has_materialized_index + and not has_external_grouper + and level is None + ): by_dtypes = ModinDtypes(self._dtypes).lazy_get(internal_by) if by_dtypes.is_materialized: new_index = ModinIndex(value=result, axis=0, dtypes=by_dtypes) diff --git a/modin/core/dataframe/pandas/dataframe/utils.py b/modin/core/dataframe/pandas/dataframe/utils.py index 08bbd6894de..d9dc23f0c33 100644 --- a/modin/core/dataframe/pandas/dataframe/utils.py +++ b/modin/core/dataframe/pandas/dataframe/utils.py @@ -114,12 +114,14 @@ class ShuffleSortFunctions(ShuffleFunctions): ---------- modin_frame : PandasDataframe The frame to build the range-partitioning for. - columns : str or list of strings - The column/columns to use as a key. + columns : str, list of strings or None + The column/columns to use as a key. Can't be specified along with `level`. ascending : bool Whether the ranges should be in ascending or descending order. ideal_num_new_partitions : int The ideal number of new partitions. + level : list of strings or ints, or None + Index level(s) to use as a key. Can't be specified along with `columns`. **kwargs : dict Additional keyword arguments. """ @@ -127,9 +129,10 @@ class ShuffleSortFunctions(ShuffleFunctions): def __init__( self, modin_frame: "PandasDataframe", - columns: Union[str, list], + columns: Optional[Union[str, list]], ascending: Union[list, bool], ideal_num_new_partitions: int, + level: Optional[list[Union[str, int]]] = None, **kwargs: dict, ): self.frame_len = len(modin_frame) @@ -137,11 +140,16 @@ def __init__( self.columns = columns if is_list_like(columns) else [columns] self.ascending = ascending self.kwargs = kwargs.copy() + self.level = level self.columns_info = None def sample_fn(self, partition: pandas.DataFrame) -> pandas.DataFrame: + if self.level is not None: + partition = self._index_to_df_zero_copy(partition, self.level) + else: + partition = partition[self.columns] return self.pick_samples_for_quantiles( - partition[self.columns], self.ideal_num_new_partitions, self.frame_len + partition, self.ideal_num_new_partitions, self.frame_len ) def pivot_fn(self, samples: "list[pandas.DataFrame]") -> int: @@ -181,6 +189,7 @@ def split_fn( partition, self.columns_info, self.ascending, + keys_are_index_levels=self.level is not None, **self.kwargs, ) @@ -312,6 +321,7 @@ def split_partitions_using_pivots_for_sort( df: pandas.DataFrame, columns_info: "list[ColumnInfo]", ascending: bool, + keys_are_index_levels: bool = False, **kwargs: dict, ) -> "tuple[pandas.DataFrame, ...]": """ @@ -330,6 +340,8 @@ def split_partitions_using_pivots_for_sort( Information regarding keys and pivots for range partitioning. ascending : bool The ascending flag. + keys_are_index_levels : bool, default: False + Whether `columns_info` describes index levels or actual columns from `df`. **kwargs : dict Additional keyword arguments. @@ -342,9 +354,14 @@ def split_partitions_using_pivots_for_sort( # We can return the dataframe with zero changes if there were no pivots passed return (df,) - na_index = ( - df[[col_info.name for col_info in columns_info]].isna().squeeze(axis=1) + key_data = ( + ShuffleSortFunctions._index_to_df_zero_copy( + df, [col_info.name for col_info in columns_info] + ) + if keys_are_index_levels + else df[[col_info.name for col_info in columns_info]] ) + na_index = key_data.isna().squeeze(axis=1) if na_index.ndim == 2: na_index = na_index.any(axis=1) na_rows = df[na_index] @@ -373,12 +390,19 @@ def get_group(grp, key, df): pivots = pivots[::-1] group_keys.append(range(len(pivots) + 1)) key = kwargs.pop("key", None) - cols_to_digitize = non_na_rows[col_info.name] + cols_to_digitize = ( + non_na_rows.index.get_level_values(col_info.name) + if keys_are_index_levels + else non_na_rows[col_info.name] + ) if key is not None: cols_to_digitize = key(cols_to_digitize) + if cols_to_digitize.ndim == 2: + cols_to_digitize = cols_to_digitize.squeeze() + if col_info.is_numeric: - groupby_col = np.digitize(cols_to_digitize.squeeze(), pivots) + groupby_col = np.digitize(cols_to_digitize, pivots) # `np.digitize` returns results based off of the sort order of the pivots it is passed. # When we only have one unique value in our pivots, `np.digitize` assumes that the pivots # are sorted in ascending order, and gives us results based off of that assumption - so if @@ -386,9 +410,7 @@ def get_group(grp, key, df): if not ascending and len(np.unique(pivots)) == 1: groupby_col = len(pivots) - groupby_col else: - groupby_col = np.searchsorted( - pivots, cols_to_digitize.squeeze(), side="right" - ) + groupby_col = np.searchsorted(pivots, cols_to_digitize, side="right") # Since np.searchsorted requires the pivots to be in ascending order, if we want to sort # in descending order, we need to swap the new indices. if not ascending: @@ -426,6 +448,36 @@ def get_group(grp, key, df): ).astype(df.dtypes) return tuple(groups) + @staticmethod + def _index_to_df_zero_copy( + df: pandas.DataFrame, levels: list[Union[str, int]] + ) -> pandas.DataFrame: + """ + Convert index `level` of `df` to a ``pandas.DataFrame``. + + Parameters + ---------- + df : pandas.DataFrame + levels : list of labels or ints + Index level to convert to a dataframe. + + Returns + ------- + pandas.DataFrame + The columns in the resulting dataframe use the same data arrays as the index levels + in the original `df`, so no copies. + """ + # calling 'df.index.to_frame()' creates a copy of the index, so doing the conversion manually + # to avoid the copy + data = { + ( + df.index.names[lvl] if isinstance(lvl, int) else lvl + ): df.index.get_level_values(lvl) + for lvl in levels + } + index_data = pandas.DataFrame(data, index=df.index, copy=False) + return index_data + def lazy_metadata_decorator(apply_axis=None, axis_arg=-1, transpose=False): """ diff --git a/modin/core/storage_formats/pandas/query_compiler.py b/modin/core/storage_formats/pandas/query_compiler.py index 60306307b42..e02ca80e915 100644 --- a/modin/core/storage_formats/pandas/query_compiler.py +++ b/modin/core/storage_formats/pandas/query_compiler.py @@ -3701,12 +3701,7 @@ def _groupby_shuffle( by, agg_func, axis, groupby_kwargs, agg_args, agg_kwargs, how, drop ) - if groupby_kwargs.get("level") is not None: - raise NotImplementedError( - "Grouping on an index level is not yet supported by range-partitioning groupby implementation: " - + "https://github.com/modin-project/modin/issues/5926" - ) - + grouping_on_level = groupby_kwargs.get("level") is not None if any( isinstance(obj, pandas.Grouper) for obj in (by if isinstance(by, list) else [by]) @@ -3716,7 +3711,10 @@ def _groupby_shuffle( + "https://github.com/modin-project/modin/issues/5926" ) - external_by, internal_by, by_positions = self._groupby_separate_by(by, drop) + if grouping_on_level: + external_by, internal_by, by_positions = [], [], [] + else: + external_by, internal_by, by_positions = self._groupby_separate_by(by, drop) all_external_are_qcs = all(isinstance(obj, type(self)) for obj in external_by) if not all_external_are_qcs: diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index a381eb3cb7c..8a6a00a252d 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -42,6 +42,7 @@ ) from .utils import ( + assert_set_of_rows_identical, check_df_columns_have_nans, create_test_dfs, create_test_series, @@ -259,9 +260,7 @@ def test_mixed_dtypes_groupby(as_index): md_sorted_grpby, pd_sorted_grpby, lambda df: df.ffill(), - comparator=lambda *dfs: df_equals( - *sort_index_if_experimental_groupby(*dfs) - ), + comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) # FIXME: https://github.com/modin-project/modin/issues/7032 eval_general( @@ -282,7 +281,19 @@ def test_mixed_dtypes_groupby(as_index): "Cannot aggregate non-numeric type: object" ), ) - eval_shift(modin_groupby, pandas_groupby) + eval_shift( + modin_groupby, + pandas_groupby, + comparator=( + # We should sort the result before comparison for transform functions + # in case of range-partitioning groupby (https://github.com/modin-project/modin/issues/5924). + # This test though produces so much NaN values in the result, so it's impossible to sort, + # using manual comparison of set of rows instead + assert_set_of_rows_identical + if RangePartitioningGroupby.get() + else None + ), + ) eval_mean(modin_groupby, pandas_groupby, numeric_only=True) eval_any(modin_groupby, pandas_groupby) eval_min(modin_groupby, pandas_groupby) @@ -309,18 +320,14 @@ def test_mixed_dtypes_groupby(as_index): modin_groupby, pandas_groupby, lambda df: df.first(), - comparator=lambda *dfs: df_equals( - *sort_index_if_experimental_groupby(*dfs) - ), + comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) eval_cummin(modin_groupby, pandas_groupby, numeric_only=True) eval_general( md_sorted_grpby, pd_sorted_grpby, lambda df: df.bfill(), - comparator=lambda *dfs: df_equals( - *sort_index_if_experimental_groupby(*dfs) - ), + comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) # numeric_only=False doesn't work eval_general( @@ -366,9 +373,7 @@ def test_mixed_dtypes_groupby(as_index): modin_groupby, pandas_groupby, lambda df: df.head(n), - comparator=lambda *dfs: df_equals( - *sort_index_if_experimental_groupby(*dfs) - ), + comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) eval_cumprod(modin_groupby, pandas_groupby, numeric_only=True) # numeric_only=False doesn't work @@ -399,9 +404,7 @@ def test_mixed_dtypes_groupby(as_index): modin_groupby, pandas_groupby, lambda df: df.tail(n), - comparator=lambda *dfs: df_equals( - *sort_index_if_experimental_groupby(*dfs) - ), + comparator=lambda *dfs: df_equals(*sort_if_experimental_groupby(*dfs)), ) eval_quantile(modin_groupby, pandas_groupby) eval_general(modin_groupby, pandas_groupby, lambda df: df.take([0])) @@ -1346,15 +1349,30 @@ def test_multi_column_groupby(): modin_df.groupby(by, axis=1).count() -def sort_index_if_experimental_groupby(*dfs): +def sort_if_experimental_groupby(*dfs): """ This method should be applied before comparing results of ``groupby.transform`` as the experimental implementation changes the order of rows for that: https://github.com/modin-project/modin/issues/5924 """ + result = dfs if RangePartitioningGroupby.get(): - return tuple(df.sort_index() for df in dfs) - return dfs + dfs = try_cast_to_pandas(dfs) + result = [] + for df in dfs: + if df.ndim == 1: + # Series case + result.append(df.sort_index()) + continue + + # filtering out index names in order to avoid: + # ValueError: 'col' is both an index level and a column label, which is ambiguous. + cols_no_idx_names = df.columns.difference( + df.index.names, sort=False + ).tolist() + df = df.sort_values(cols_no_idx_names) + result.append(df) + return result def eval_ngroups(modin_groupby, pandas_groupby): @@ -1389,7 +1407,7 @@ def eval_ndim(modin_groupby, pandas_groupby): def eval_cumsum(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only=False): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.cumsum(axis=axis, numeric_only=numeric_only), pandas_groupby.cumsum(axis=axis, numeric_only=numeric_only), ) @@ -1398,7 +1416,7 @@ def eval_cumsum(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only def eval_cummax(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only=False): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.cummax(axis=axis, numeric_only=numeric_only), pandas_groupby.cummax(axis=axis, numeric_only=numeric_only), ) @@ -1407,7 +1425,7 @@ def eval_cummax(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only def eval_cummin(modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only=False): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.cummin(axis=axis, numeric_only=numeric_only), pandas_groupby.cummin(axis=axis, numeric_only=numeric_only), ) @@ -1486,13 +1504,13 @@ def eval_cumprod( modin_groupby, pandas_groupby, axis=lib.no_default, numeric_only=False ): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.cumprod(numeric_only=numeric_only), pandas_groupby.cumprod(numeric_only=numeric_only), ) ) df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.cumprod(axis=axis, numeric_only=numeric_only), pandas_groupby.cumprod(axis=axis, numeric_only=numeric_only), ) @@ -1501,7 +1519,7 @@ def eval_cumprod( def eval_transform(modin_groupby, pandas_groupby, func): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.transform(func), pandas_groupby.transform(func) ) ) @@ -1509,7 +1527,7 @@ def eval_transform(modin_groupby, pandas_groupby, func): def eval_fillna(modin_groupby, pandas_groupby): df_equals( - *sort_index_if_experimental_groupby( + *sort_if_experimental_groupby( modin_groupby.fillna(method="ffill"), pandas_groupby.fillna(method="ffill") ) ) @@ -1624,9 +1642,11 @@ def eval_groups(modin_groupby, pandas_groupby): df_equals(modin_groupby.get_group(name), pandas_groupby.get_group(name)) -def eval_shift(modin_groupby, pandas_groupby): - def comparator(df1, df2): - df_equals(*sort_index_if_experimental_groupby(df1, df2)) +def eval_shift(modin_groupby, pandas_groupby, comparator=None): + if comparator is None: + + def comparator(df1, df2): + df_equals(*sort_if_experimental_groupby(df1, df2)) eval_general( modin_groupby, @@ -1726,7 +1746,7 @@ def test_groupby_getitem_preserves_key_order_issue_6154(): ], ) def test_groupby_multiindex(groupby_kwargs): - frame_data = np.random.randint(0, 100, size=(2**6, 2**4)) + frame_data = np.random.randint(0, 100, size=(2**6, 2**6)) modin_df = pd.DataFrame(frame_data) pandas_df = pandas.DataFrame(frame_data) @@ -2047,7 +2067,7 @@ def comparator(df1, df2): from modin.core.dataframe.algebra.default2pandas.groupby import GroupBy if GroupBy.is_transformation_kernel(operation): - df1, df2 = sort_index_if_experimental_groupby(df1, df2) + df1, df2 = sort_if_experimental_groupby(df1, df2) df_equals(df1, df2) diff --git a/modin/pandas/test/utils.py b/modin/pandas/test/utils.py index d87ee007077..08c976427e5 100644 --- a/modin/pandas/test/utils.py +++ b/modin/pandas/test/utils.py @@ -665,6 +665,23 @@ def assert_dtypes_equal(df1, df2): break +def assert_set_of_rows_identical(df1, df2): + """ + Assert that the set of rows for the passed dataframes is identical. + + Works much slower than ``df1.equals(df2)``, so it's recommended to use this + function only in exceptional cases. + """ + # replacing NaN with None to pass the comparison: 'NaN == NaN -> false; None == None -> True' + df1, df2 = map( + lambda df: (df.to_frame() if df.ndim == 1 else df).replace({np.nan: None}), + (df1, df2), + ) + rows1 = set((idx, *row.tolist()) for idx, row in df1.iterrows()) + rows2 = set((idx, *row.tolist()) for idx, row in df2.iterrows()) + assert rows1 == rows2 + + def sort_data(data): """Sort the passed sequence.""" if isinstance(data, (pandas.DataFrame, pd.DataFrame)):