Skip to content

Commit

Permalink
FEAT-modin-project#2491: optimized dictionary reduce aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Dmitry Chigarev <[email protected]>
  • Loading branch information
dchigarev committed Dec 17, 2020
1 parent 126f2a5 commit e5f7318
Show file tree
Hide file tree
Showing 7 changed files with 369 additions and 207 deletions.
94 changes: 70 additions & 24 deletions modin/backends/pandas/query_compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from pandas.core.base import DataError
from typing import Type, Callable
from collections.abc import Container
import warnings


Expand All @@ -37,6 +38,7 @@
ReductionFunction,
BinaryFunction,
GroupbyReduceFunction,
groupby_reduce_functions,
)


Expand Down Expand Up @@ -2443,33 +2445,63 @@ def _callable_func(self, func, axis, *args, **kwargs):
# nature. They require certain data to exist on the same partition, and
# after the shuffle, there should be only a local map required.

groupby_count = GroupbyReduceFunction.register(
lambda df, **kwargs: df.count(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
)
groupby_any = GroupbyReduceFunction.register(
lambda df, **kwargs: df.any(**kwargs), lambda df, **kwargs: df.any(**kwargs)
)
groupby_min = GroupbyReduceFunction.register(
lambda df, **kwargs: df.min(**kwargs), lambda df, **kwargs: df.min(**kwargs)
)
groupby_prod = GroupbyReduceFunction.register(
lambda df, **kwargs: df.prod(**kwargs), lambda df, **kwargs: df.prod(**kwargs)
)
groupby_max = GroupbyReduceFunction.register(
lambda df, **kwargs: df.max(**kwargs), lambda df, **kwargs: df.max(**kwargs)
)
groupby_all = GroupbyReduceFunction.register(
lambda df, **kwargs: df.all(**kwargs), lambda df, **kwargs: df.all(**kwargs)
)
groupby_sum = GroupbyReduceFunction.register(
lambda df, **kwargs: df.sum(**kwargs), lambda df, **kwargs: df.sum(**kwargs)
)
groupby_count = GroupbyReduceFunction.register(*groupby_reduce_functions["count"])
groupby_any = GroupbyReduceFunction.register(*groupby_reduce_functions["any"])
groupby_min = GroupbyReduceFunction.register(*groupby_reduce_functions["min"])
groupby_prod = GroupbyReduceFunction.register(*groupby_reduce_functions["prod"])
groupby_max = GroupbyReduceFunction.register(*groupby_reduce_functions["max"])
groupby_all = GroupbyReduceFunction.register(*groupby_reduce_functions["all"])
groupby_sum = GroupbyReduceFunction.register(*groupby_reduce_functions["sum"])
groupby_size = GroupbyReduceFunction.register(
lambda df, **kwargs: pandas.DataFrame(df.size()),
lambda df, **kwargs: df.sum(),
method="size",
*groupby_reduce_functions["size"], method="size"
)

def _groupby_dict_reduce(
self, by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop=False
):
map_dict = {}
reduce_dict = {}
rename_columns = any(
not isinstance(fn, str) and isinstance(fn, Container)
for fn in agg_func.values()
)
# breakpoint()
for col, col_funcs in agg_func.items():
# single function without renaming
if not rename_columns:
map_dict[col], reduce_dict[col] = groupby_reduce_functions[col_funcs]
continue

if isinstance(col_funcs, str):
col_funcs = [col_funcs]
# breakpoint()
map_fns = []
for i, fn in enumerate(col_funcs):
if not isinstance(fn, str) and isinstance(fn, Container):
assert (
len(fn) == 2
), f"Incorrect number of values to unpack. (got {len(fn)} expected 2)"
future_col_name, func = fn[0], fn[1]
elif isinstance(fn, str):
future_col_name, func = fn, fn
else:
raise TypeError
# breakpoint()
map_fns.append((future_col_name, groupby_reduce_functions[func][0]))
reduce_dict[(col, future_col_name)] = groupby_reduce_functions[func][1]
map_dict[col] = map_fns
# breakpoint()
return GroupbyReduceFunction.register(map_dict, reduce_dict)(
query_compiler=self,
by=by,
axis=axis,
groupby_args=groupby_kwargs,
map_args=agg_kwargs,
reduce_args=agg_kwargs,
numeric_only=False,
drop=drop,
)

def groupby_agg(
self,
by,
Expand All @@ -2481,6 +2513,20 @@ def groupby_agg(
groupby_kwargs,
drop=False,
):
def is_reduce_fn(o):
if callable(o):
return False
if isinstance(o, str):
o = [o]
return all(x in groupby_reduce_functions for x in o)

if isinstance(agg_func, dict) and all(
is_reduce_fn(x) for x in agg_func.values()
):
return self._groupby_dict_reduce(
by, axis, agg_func, agg_args, agg_kwargs, groupby_kwargs, drop
)

if callable(agg_func):
agg_func = wrap_udf_function(agg_func)

Expand Down
3 changes: 2 additions & 1 deletion modin/data_management/functions/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .reductionfunction import ReductionFunction
from .foldfunction import FoldFunction
from .binary_function import BinaryFunction
from .groupby_function import GroupbyReduceFunction
from .groupby_function import GroupbyReduceFunction, groupby_reduce_functions

__all__ = [
"Function",
Expand All @@ -27,4 +27,5 @@
"FoldFunction",
"BinaryFunction",
"GroupbyReduceFunction",
"groupby_reduce_functions",
]
Loading

0 comments on commit e5f7318

Please sign in to comment.