Skip to content

Commit

Permalink
Merged groupby_agg and groupby_dict_agg to implement dictionary funct…
Browse files Browse the repository at this point in the history
…ions aggregations (#2317)

* FIX-#2254: Added dictionary functions to groupby aggregate tests

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Initial implementation of dictionary functions aggregation

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Remove lambda wrapper to allow dictionary to go to backend

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Fixed AttributeError not being thrown from getattr

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Lint fixes

Signed-off-by: Gregory Shimansky <[email protected]>

* FEAT-#2363: fix index name setter in OmniSci backend

Signed-off-by: ienkovich <[email protected]>

* FIX-#2254: Removed obsolete groupby_dict_agg API function

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Fixed dict aggregate for base backend

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Address reformatting comments

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Remove whitespace

Signed-off-by: Gregory Shimansky <[email protected]>

* FIX-#2254: Removed redundant argument conversion

because it is already done inside of base backend.

Signed-off-by: Gregory Shimansky <[email protected]>

Co-authored-by: ienkovich <[email protected]>
  • Loading branch information
gshimansky and ienkovich authored Nov 13, 2020
1 parent 2b0b755 commit 5c9398c
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 77 deletions.
10 changes: 0 additions & 10 deletions modin/backends/base/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1426,16 +1426,6 @@ def groupby_agg(
drop=drop,
)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
return GroupByDefault.register(pandas.core.groupby.DataFrameGroupBy.aggregate)(
self,
by=by,
func_dict=func_dict,
groupby_args=groupby_args,
agg_args=agg_args,
drop=drop,
)

# END Manual Partitioning methods

def unstack(self, level, fill_value):
Expand Down
13 changes: 10 additions & 3 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -2573,7 +2573,8 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
agg_func = wrap_udf_function(agg_func)
if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

if is_multi_by:
return super().groupby_agg(
Expand Down Expand Up @@ -2605,7 +2606,11 @@ def groupby_agg_builder(df):
def compute_groupby(df):
grouped_df = df.groupby(by=by, axis=axis, **groupby_kwargs)
try:
result = agg_func(grouped_df, **agg_kwargs)
result = (
grouped_df.agg(agg_func)
if isinstance(agg_func, dict)
else agg_func(grouped_df, **agg_kwargs)
)
# This happens when the partition is filled with non-numeric data and a
# numeric operation is done. We need to build the index here to avoid
# issues with extracting the index.
Expand All @@ -2631,7 +2636,9 @@ def compute_groupby(df):
# determening type of raised exception by applying `aggfunc`
# to empty DataFrame
try:
agg_func(
pandas.DataFrame(index=[1], columns=[1]).agg(agg_func) if isinstance(
agg_func, dict
) else agg_func(
pandas.DataFrame(index=[1], columns=[1]).groupby(level=0),
**agg_kwargs,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,11 @@ def fn(

grp = df.groupby(by, axis=axis, **groupby_args)
agg_func = cls.get_func(grp, key, **kwargs)
result = agg_func(grp, **agg_args)
result = (
grp.agg(agg_func, **agg_args)
if isinstance(agg_func, dict)
else agg_func(grp, **agg_args)
)

if not is_multi_by:
if as_index:
Expand Down
27 changes: 0 additions & 27 deletions modin/experimental/backends/omnisci/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,33 +279,6 @@ def groupby_agg(
)
return self.__constructor__(new_frame)

def groupby_dict_agg(self, by, func_dict, groupby_args, agg_args, drop=False):
"""Apply aggregation functions to a grouped dataframe per-column.
Parameters
----------
by : DFAlgQueryCompiler
The column to group by
func_dict : dict of str, callable/string
The dictionary mapping of column to function
groupby_args : dict
The dictionary of keyword arguments for the group by.
agg_args : dict
The dictionary of keyword arguments for the aggregation functions
drop : bool
Whether or not to drop the column from the data.
Returns
-------
DFAlgQueryCompiler
The result of the per-column aggregations on the grouped dataframe.
"""
# TODO: handle `drop` arg
new_frame = self._modin_frame.groupby_agg(
by, 0, func_dict, groupby_args, **agg_args
)
return self.__constructor__(new_frame)

def count(self, **kwargs):
return self._agg("count", **kwargs)

Expand Down
52 changes: 19 additions & 33 deletions modin/pandas/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,8 @@ def aggregate(self, func=None, *args, **kwargs):
# This is not implemented in pandas,
# so we throw a different message
raise NotImplementedError("axis other than 0 is not supported")

relabeling_required = False
if isinstance(func, dict) or func is None:

def _reconstruct_func(func, **kwargs):
Expand All @@ -380,50 +382,32 @@ def _reconstruct_func(func, **kwargs):
from pandas.core.base import SpecificationError

raise SpecificationError("nested renamer is not supported")
if isinstance(self._by, type(self._query_compiler)):
by = list(self._by.columns)
else:
by = self._by

subset_cols = list(func_dict.keys()) + (
list(self._by.columns)
if isinstance(self._by, type(self._query_compiler))
and all(c in self._df.columns for c in self._by.columns)
else []
)
result = type(self._df)(
query_compiler=self._df[subset_cols]._query_compiler.groupby_dict_agg(
by=by,
func_dict=func_dict,
groupby_args=self._kwargs,
agg_args=kwargs,
drop=self._drop,
)
)

if relabeling_required:
result = result.iloc[:, order]
result.columns = new_columns

return result

if is_list_like(func):
func = func_dict
elif is_list_like(func):
return self._default_to_pandas(
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),
*args,
**kwargs,
)
if isinstance(func, str):
agg_func = getattr(self, func, None)
elif isinstance(func, str):
# Using "getattr" here masks possible AttributeError which we throw
# in __getattr__, so we should call __getattr__ directly instead.
agg_func = self.__getattr__(func)
if callable(agg_func):
return agg_func(*args, **kwargs)
return self._apply_agg_function(
lambda df, *args, **kwargs: df.aggregate(func, *args, **kwargs),

result = self._apply_agg_function(
func,
drop=self._as_index,
*args,
**kwargs,
)

if relabeling_required:
result = result.iloc[:, order]
result.columns = new_columns
return result

agg = aggregate

def last(self, **kwargs):
Expand Down Expand Up @@ -888,7 +872,9 @@ def _apply_agg_function(self, f, drop=True, *args, **kwargs):
-------
A new combined DataFrame with the result of all groups.
"""
assert callable(f), "'{0}' object is not callable".format(type(f))
assert callable(f) or isinstance(
f, dict
), "'{0}' object is not callable and not a dict".format(type(f))

# For aggregations, pandas behavior does this for the result.
# For other operations it does not, so we wait until there is an aggregation to
Expand Down
16 changes: 13 additions & 3 deletions modin/pandas/test/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,12 @@ def test_mixed_dtypes_groupby(as_index):
eval_var(modin_groupby, pandas_groupby)
eval_skew(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = [
"min",
"max",
{"col2": "sum"},
{"col2": "max", "col4": "sum", "col5": "min"},
]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -479,7 +484,12 @@ def test_single_group_row_groupby():
eval_prod(modin_groupby, pandas_groupby)
eval_std(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = [
"min",
"max",
{"col2": "sum"},
{"col2": "max", "col4": "sum", "col5": "min"},
]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down Expand Up @@ -595,7 +605,7 @@ def test_large_row_groupby(is_by_category):
# eval_prod(modin_groupby, pandas_groupby) causes overflows
eval_std(modin_groupby, pandas_groupby)

agg_functions = ["min", "max"]
agg_functions = ["min", "max", {"A": "sum"}, {"A": "max", "B": "sum", "C": "min"}]
for func in agg_functions:
eval_agg(modin_groupby, pandas_groupby, func)
eval_aggregate(modin_groupby, pandas_groupby, func)
Expand Down

0 comments on commit 5c9398c

Please sign in to comment.