From f42d117621cb73d09a9c0e2b7d95d6fe00a92cfb Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Fri, 19 Aug 2022 05:24:56 -0500 Subject: [PATCH] Fix groupby failures in dask_cudf CI (#11561) Dask-cudf groupby tests *should* be failing as a result of https://github.com/dask/dask/pull/9302 (see [failures](https://gpuci.gpuopenanalytics.com/job/rapidsai/job/gpuci/job/cudf/job/prb/job/cudf-gpu-test/CUDA=11.5,GPU_LABEL=driver-495,LINUX_VER=ubuntu20.04,PYTHON=3.9/9946/) in #11565 is merged - where dask/main is being installed correctly). This PR updates the dask_cudf groupby code to fix these failures. Authors: - Richard (Rick) Zamora (https://github.com/rjzamora) Approvers: - GALI PREM SAGAR (https://github.com/galipremsagar) URL: https://github.com/rapidsai/cudf/pull/11561 --- python/dask_cudf/dask_cudf/groupby.py | 18 ++++++++++++++---- .../dask_cudf/dask_cudf/tests/test_groupby.py | 4 ++-- 2 files changed, 16 insertions(+), 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/groupby.py b/python/dask_cudf/dask_cudf/groupby.py index 58024e1b71a..3a1557e77f4 100644 --- a/python/dask_cudf/dask_cudf/groupby.py +++ b/python/dask_cudf/dask_cudf/groupby.py @@ -249,7 +249,7 @@ def last(self, split_every=None, split_out=1): ) @_dask_cudf_nvtx_annotate - def aggregate(self, arg, split_every=None, split_out=1): + def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": return self.size() @@ -274,7 +274,12 @@ def aggregate(self, arg, split_every=None, split_out=1): ) return super().aggregate( - arg, split_every=split_every, split_out=split_out + arg, + split_every=split_every, + split_out=split_out, + # TODO: Change following line to `shuffle=shuffle,` + # when dask_cudf is pinned to dask>2022.8.0 + **({} if shuffle is None else {"shuffle": shuffle}), ) @@ -436,7 +441,7 @@ def last(self, split_every=None, split_out=1): )[self._slice] @_dask_cudf_nvtx_annotate - def aggregate(self, arg, split_every=None, split_out=1): + def aggregate(self, arg, split_every=None, split_out=1, shuffle=None): if arg == "size": return self.size() @@ -459,7 +464,12 @@ def aggregate(self, arg, split_every=None, split_out=1): )[self._slice] return super().aggregate( - arg, split_every=split_every, split_out=split_out + arg, + split_every=split_every, + split_out=split_out, + # TODO: Change following line to `shuffle=shuffle,` + # when dask_cudf is pinned to dask>2022.8.0 + **({} if shuffle is None else {"shuffle": shuffle}), ) diff --git a/python/dask_cudf/dask_cudf/tests/test_groupby.py b/python/dask_cudf/dask_cudf/tests/test_groupby.py index 9d2fc5196e8..e6c23992c4e 100644 --- a/python/dask_cudf/dask_cudf/tests/test_groupby.py +++ b/python/dask_cudf/dask_cudf/tests/test_groupby.py @@ -575,12 +575,12 @@ def test_groupby_categorical_key(): ddf = gddf.to_dask_dataframe() got = ( - gddf.groupby("name") + gddf.groupby("name", sort=True) .agg({"x": ["mean", "max"], "y": ["mean", "count"]}) .compute() ) expect = ( - ddf.groupby("name") + ddf.groupby("name", sort=True) .agg({"x": ["mean", "max"], "y": ["mean", "count"]}) .compute() )