Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add collect list to dask-cudf groupby aggregations #8045

Merged
merged 10 commits into from
Jul 6, 2021
54 changes: 47 additions & 7 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,16 @@ def aggregate(self, arg, split_every=None, split_out=1):
return self.size()
arg = _redirect_aggs(arg)

_supported = {"count", "mean", "std", "var", "sum", "min", "max"}
_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
}
if (
isinstance(self.obj, DaskDataFrame)
and isinstance(self.index, (str, list))
Expand Down Expand Up @@ -109,7 +118,16 @@ def aggregate(self, arg, split_every=None, split_out=1):
return self.size()
arg = _redirect_aggs(arg)

_supported = {"count", "mean", "std", "var", "sum", "min", "max"}
_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
}
if (
isinstance(self.obj, DaskDataFrame)
and isinstance(self.index, (str, list))
Expand Down Expand Up @@ -147,7 +165,7 @@ def groupby_agg(

This aggregation algorithm only supports the following options:

{"count", "mean", "std", "var", "sum", "min", "max"}
{"count", "mean", "std", "var", "sum", "min", "max", "collect"}

This "optimized" approach is more performant than the algorithm
in `dask.dataframe`, because it allows the cudf backend to
Expand All @@ -173,15 +191,24 @@ def groupby_agg(
# strings (no lists)
str_cols_out = True
for col in aggs:
if isinstance(aggs[col], str):
if isinstance(aggs[col], str) or callable(aggs[col]):
aggs[col] = [aggs[col]]
else:
str_cols_out = False
if col in gb_cols:
columns.append(col)

# Assert that aggregations are supported
_supported = {"count", "mean", "std", "var", "sum", "min", "max"}
_supported = {
"count",
"mean",
"std",
"var",
"sum",
"min",
"max",
"collect",
}
if not _is_supported(aggs, _supported):
raise ValueError(
f"Supported aggs include {_supported} for groupby_agg API. "
Expand Down Expand Up @@ -257,7 +284,12 @@ def groupby_agg(
# be str, rather than tuples).
for col in aggs:
_aggs[col] = _aggs[col][0]
_meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
try:
_meta = ddf._meta.groupby(gb_cols, as_index=as_index).agg(_aggs)
except NotImplementedError:
_meta = ddf._meta_nonempty.groupby(gb_cols, as_index=as_index).agg(
_aggs
)
charlesbluca marked this conversation as resolved.
Show resolved Hide resolved
for s in range(split_out):
dsk[(gb_agg_name, s)] = (
_finalize_gb_agg,
Expand All @@ -282,7 +314,13 @@ def groupby_agg(
def _redirect_aggs(arg):
""" Redirect aggregations to their corresponding name in cuDF
"""
redirects = {sum: "sum", max: "max", min: "min"}
redirects = {
sum: "sum",
max: "max",
min: "min",
list: "collect",
"list": "collect",
Comment on lines +316 to +317
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this approach.

}
if isinstance(arg, dict):
new_arg = dict()
for col in arg:
Expand Down Expand Up @@ -400,6 +438,8 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep):
agg_dict[col] = ["sum"]
elif agg in ("min", "max"):
agg_dict[col] = [agg]
elif agg == "collect":
agg_dict[col] = ["collect"]
else:
raise ValueError(f"Unexpected aggregation: {agg}")

Expand Down