diff --git a/modin/backends/base/query_compiler.py b/modin/backends/base/query_compiler.py index 06b74d43e3a..e9cc7067796 100644 --- a/modin/backends/base/query_compiler.py +++ b/modin/backends/base/query_compiler.py @@ -1426,16 +1426,6 @@ def groupby_agg( drop=drop, ) - def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False): - return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)( - self, - by=by, - func_dict=func_dict, - groupby_args=groupby_args, - agg_args=agg_args, - drop=drop, - ) - # END Manual Partitioning methods def unstack(self, level, fill_value): diff --git a/modin/backends/pandas/query_compiler.py b/modin/backends/pandas/query_compiler.py index 99f9ed4445d..2567f33e026 100644 --- a/modin/backends/pandas/query_compiler.py +++ b/modin/backends/pandas/query_compiler.py @@ -2573,7 +2573,8 @@ def groupby_agg( groupby_kwargs, drop=False, ): - agg_func = wrap_udf_function(agg_func) + if callable(agg_func): + agg_func = wrap_udf_function(agg_func) if is_multi_by: return super().groupby_agg( @@ -2605,7 +2606,11 @@ def groupby_agg_builder(df): def compute_groupby(df): grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs) try: - result = agg_func(grouped_df, **agg_kwargs) + result = ( + grouped_df.agg(agg_func) + if isinstance(agg_func, dict) + else agg_func(grouped_df, **agg_kwargs) + ) # This happens when the partition is filled with non-numeric data and a # numeric operation is done. We need to build the index here to avoid # issues with extracting the index. @@ -2631,7 +2636,9 @@ def compute_groupby(df): # determening type of raised exception by applying `aggfunc` # to empty DataFrame try: - agg_func( + pandas.DataFrame(index=[1], columns=[1]).agg(agg_func) if isinstance( + agg_func, dict + ) else agg_func( pandas.DataFrame(index=[1], columns=[1]).groupby(level=0), **agg_kwargs, ) diff --git a/modin/data_management/functions/default_methods/groupby_default.py b/modin/data_management/functions/default_methods/groupby_default.py index b6ae497c75f..e6cd40675e7 100644 --- a/modin/data_management/functions/default_methods/groupby_default.py +++ b/modin/data_management/functions/default_methods/groupby_default.py @@ -80,7 +80,11 @@ def fn( grp = df.groupby(by, axis=axis, **groupby_args) agg_func = cls.get_func(grp, key, **kwargs) - result = agg_func(grp, **agg_args) + result = ( + grp.agg(agg_func, **agg_args) + if isinstance(agg_func, dict) + else agg_func(grp, **agg_args) + ) if not is_multi_by: if as_index: diff --git a/modin/experimental/backends/omnisci/query_compiler.py b/modin/experimental/backends/omnisci/query_compiler.py index eadee462d9f..35ce16e9917 100644 --- a/modin/experimental/backends/omnisci/query_compiler.py +++ b/modin/experimental/backends/omnisci/query_compiler.py @@ -279,33 +279,6 @@ def groupby_agg( ) return self.__constructor__(new_frame) - def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False): - """Apply aggregation functions to a grouped dataframe per-column. - - Parameters - ---------- - by : DFAlgQueryCompiler - The column to group by - func_dict : dict of str, callable/string - The dictionary mapping of column to function - groupby_args : dict - The dictionary of keyword arguments for the group by. - agg_args : dict - The dictionary of keyword arguments for the aggregation functions - drop : bool - Whether or not to drop the column from the data. - - Returns - ------- - DFAlgQueryCompiler - The result of the per-column aggregations on the grouped dataframe. - """ - # TODO: handle `drop` arg - new_frame = self._modin_frame.groupby_agg( - by, 0, func_dict, groupby_args, **agg_args - ) - return self.__constructor__(new_frame) - def count(self, **kwargs): return self._agg("count", **kwargs) diff --git a/modin/pandas/groupby.py b/modin/pandas/groupby.py index 5eedd42759c..3329a0412c1 100644 --- a/modin/pandas/groupby.py +++ b/modin/pandas/groupby.py @@ -357,6 +357,8 @@ def aggregate(self, func=None, *args, **kwargs): # This is not implemented in pandas, # so we throw a different message raise NotImplementedError("axis other than 0 is not supported") + + relabeling_required = False if isinstance(func, dict) or func is None: def _reconstruct_func(func, **kwargs): @@ -380,50 +382,32 @@ def _reconstruct_func(func, **kwargs): from pandas.core.base import SpecificationError raise SpecificationError("nested renamer is not supported") - if isinstance(self._by, type(self._query_compiler)): - by = list(self._by.columns) - else: - by = self._by - - subset_cols = list(func_dict.keys()) + ( - list(self._by.columns) - if isinstance(self._by, type(self._query_compiler)) - and all(c in self._df.columns for c in self._by.columns) - else [] - ) - result = type(self._df)( - query_compiler=self._df[subset_cols]._query_compiler.groupby_dict_agg( - by=by, - func_dict=func_dict, - groupby_args=self._kwargs, - agg_args=kwargs, - drop=self._drop, - ) - ) - - if relabeling_required: - result = result.iloc[:, order] - result.columns = new_columns - - return result - - if is_list_like(func): + func = func_dict + elif is_list_like(func): return self._default_to_pandas( lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs), *args, **kwargs, ) - if isinstance(func, str): - agg_func = getattr(self, func, None) + elif isinstance(func, str): + # Using "getattr" here masks possible AttributeError which we throw + # in __getattr__, so we should call __getattr__ directly instead. + agg_func = self.__getattr__(func) if callable(agg_func): return agg_func(*args, **kwargs) - return self._apply_agg_function( - lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs), + + result = self._apply_agg_function( + func, drop=self._as_index, *args, **kwargs, ) + if relabeling_required: + result = result.iloc[:, order] + result.columns = new_columns + return result + agg = aggregate def last(self, **kwargs): @@ -888,7 +872,9 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs): ------- A new combined DataFrame with the result of all groups. """ - assert callable(f), "'{0}' object is not callable".format(type(f)) + assert callable(f) or isinstance( + f, dict + ), "'{0}' object is not callable and not a dict".format(type(f)) # For aggregations, pandas behavior does this for the result. # For other operations it does not, so we wait until there is an aggregation to diff --git a/modin/pandas/test/test_groupby.py b/modin/pandas/test/test_groupby.py index b522e26f673..e75c223346a 100644 --- a/modin/pandas/test/test_groupby.py +++ b/modin/pandas/test/test_groupby.py @@ -149,7 +149,12 @@ def test_mixed_dtypes_groupby(as_index): eval_var(modin_groupby, pandas_groupby) eval_skew(modin_groupby, pandas_groupby) - agg_functions = ["min", "max"] + agg_functions = [ + "min", + "max", + {"col2": "sum"}, + {"col2": "max", "col4": "sum", "col5": "min"}, + ] for func in agg_functions: eval_agg(modin_groupby, pandas_groupby, func) eval_aggregate(modin_groupby, pandas_groupby, func) @@ -479,7 +484,12 @@ def test_single_group_row_groupby(): eval_prod(modin_groupby, pandas_groupby) eval_std(modin_groupby, pandas_groupby) - agg_functions = ["min", "max"] + agg_functions = [ + "min", + "max", + {"col2": "sum"}, + {"col2": "max", "col4": "sum", "col5": "min"}, + ] for func in agg_functions: eval_agg(modin_groupby, pandas_groupby, func) eval_aggregate(modin_groupby, pandas_groupby, func) @@ -595,7 +605,7 @@ def test_large_row_groupby(is_by_category): # eval_prod(modin_groupby, pandas_groupby) causes overflows eval_std(modin_groupby, pandas_groupby) - agg_functions = ["min", "max"] + agg_functions = ["min", "max", {"A": "sum"}, {"A": "max", "B": "sum", "C": "min"}] for func in agg_functions: eval_agg(modin_groupby, pandas_groupby, func) eval_aggregate(modin_groupby, pandas_groupby, func)