Skip to content

Commit

Permalink
Fix dask-cudf groupby handling when grouping by all columns (#10866)
Browse files Browse the repository at this point in the history
Modifies `_make_name` and its various calls throughout the groupby code to address the fact that groupbys can sometimes return an index (i.e. list of strings) rather than a multi-index (i.e. list of tuples); right now the only case I know of where that can occur is when we compute a `GroupBy.agg` with no aggregations, which happens if we attempt to compute a groupby agg grouping by all columns.

Closes #10863

Authors:
  - Charles Blackmon-Luca (https://github.com/charlesbluca)

Approvers:
  - https://github.com/jakirkham
  - Mike McCarty (https://github.com/mmccarty)
  - Richard (Rick) Zamora (https://github.com/rjzamora)

URL: #10866
  • Loading branch information
charlesbluca authored May 17, 2022
1 parent 6352b4e commit 80e4262
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 14 deletions.
34 changes: 20 additions & 14 deletions python/dask_cudf/dask_cudf/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -681,10 +681,13 @@ def _groupby_supported(gb):
)


def _make_name(*args, sep="_"):
"""Combine elements of `args` into a new string"""
_args = (arg for arg in args if arg != "")
return sep.join(_args)
def _make_name(col_name, sep="_"):
"""Combine elements of `col_name` into a single string, or no-op if
`col_name` is already a string
"""
if isinstance(col_name, str):
return col_name
return sep.join(name for name in col_name if name != "")


@_dask_cudf_nvtx_annotate
Expand Down Expand Up @@ -714,14 +717,14 @@ def _groupby_partition_agg(
_agg_dict[col].add(agg)
_agg_dict[col] = list(_agg_dict[col])
if set(agg_list).intersection({"std", "var"}):
pow2_name = _make_name(col, "pow2", sep=sep)
pow2_name = _make_name((col, "pow2"), sep=sep)
df[pow2_name] = df[col].astype("float64").pow(2)
_agg_dict[pow2_name] = ["sum"]

gb = df.groupby(gb_cols, dropna=dropna, as_index=False, sort=sort).agg(
_agg_dict
)
gb.columns = [_make_name(*name, sep=sep) for name in gb.columns]
gb.columns = [_make_name(name, sep=sep) for name in gb.columns]

if split_out == 1:
output = {0: gb.copy(deep=False)}
Expand Down Expand Up @@ -776,7 +779,10 @@ def _tree_node_agg(dfs, gb_cols, split_out, dropna, sort, sep):
)

# Don't include the last aggregation in the column names
gb.columns = [_make_name(*name[:-1], sep=sep) for name in gb.columns]
gb.columns = [
_make_name(name[:-1] if isinstance(name, tuple) else name, sep=sep)
for name in gb.columns
]
return gb


Expand Down Expand Up @@ -829,27 +835,27 @@ def _finalize_gb_agg(
agg_list = aggs.get(col, [])
agg_set = set(agg_list)
if agg_set.intersection({"mean", "std", "var"}):
count_name = _make_name(col, "count", sep=sep)
sum_name = _make_name(col, "sum", sep=sep)
count_name = _make_name((col, "count"), sep=sep)
sum_name = _make_name((col, "sum"), sep=sep)
if agg_set.intersection({"std", "var"}):
pow2_sum_name = _make_name(col, "pow2", "sum", sep=sep)
pow2_sum_name = _make_name((col, "pow2", "sum"), sep=sep)
var = _var_agg(gb, col, count_name, sum_name, pow2_sum_name)
if "var" in agg_list:
name_var = _make_name(col, "var", sep=sep)
name_var = _make_name((col, "var"), sep=sep)
gb[name_var] = var
if "std" in agg_list:
name_std = _make_name(col, "std", sep=sep)
name_std = _make_name((col, "std"), sep=sep)
gb[name_std] = np.sqrt(var)
gb.drop(columns=[pow2_sum_name], inplace=True)
if "mean" in agg_list:
mean_name = _make_name(col, "mean", sep=sep)
mean_name = _make_name((col, "mean"), sep=sep)
gb[mean_name] = gb[sum_name] / gb[count_name]
if "sum" not in agg_list:
gb.drop(columns=[sum_name], inplace=True)
if "count" not in agg_list:
gb.drop(columns=[count_name], inplace=True)
if "collect" in agg_list:
collect_name = _make_name(col, "collect", sep=sep)
collect_name = _make_name((col, "collect"), sep=sep)
gb[collect_name] = gb[collect_name].list.concat()

# Ensure sorted keys if `sort=True`
Expand Down
31 changes: 31 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -775,3 +775,34 @@ def test_groupby_nested_dict(func):
b.name = None

dd.assert_eq(a, b)


@pytest.mark.parametrize(
"func",
[
lambda df: df.groupby(["x", "y"]).min(),
pytest.param(
lambda df: df.groupby(["x", "y"]).agg("min"),
marks=pytest.mark.skip(
reason="https://github.com/dask/dask/issues/9093"
),
),
lambda df: df.groupby(["x", "y"]).y.min(),
lambda df: df.groupby(["x", "y"]).y.agg("min"),
],
)
def test_groupby_all_columns(func):
pdf = pd.DataFrame(
{
"x": np.random.randint(0, 5, size=10000),
"y": np.random.normal(size=10000),
}
)

ddf = dd.from_pandas(pdf, npartitions=5)
gddf = ddf.map_partitions(cudf.from_pandas)

expect = func(ddf)
actual = func(gddf)

dd.assert_eq(expect, actual)

0 comments on commit 80e4262

Please sign in to comment.