From 49d911a938079b3722692e4114221b67dbf09e81 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Mar 2024 20:12:27 -0700 Subject: [PATCH 1/7] fix var --- python/dask_cudf/dask_cudf/expr/__init__.py | 16 ++- python/dask_cudf/dask_cudf/expr/_expr.py | 106 +++++++++++++++++--- 2 files changed, 104 insertions(+), 18 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py index c36dd0abcb9..dfc86cccc12 100644 --- a/python/dask_cudf/dask_cudf/expr/__init__.py +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -6,13 +6,19 @@ # For dask>=2024.3.0, a null value will default to True QUERY_PLANNING_ON = config.get("dataframe.query-planning", None) is not False + +# Set shuffle default to "tasks" +if config.get("dataframe.shuffle.method", None) is None: + config.set({"dataframe.shuffle.method", "tasks"}) + + # Register custom expressions and collections -try: - import dask_cudf.expr._collection - import dask_cudf.expr._expr +if QUERY_PLANNING_ON: + try: + import dask_cudf.expr._collection + import dask_cudf.expr._expr -except ImportError as err: - if QUERY_PLANNING_ON: + except ImportError as err: # Dask *should* raise an error before this. # However, we can still raise here to be certain. raise RuntimeError( diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 6def6e23b12..e99c89d914e 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,7 +1,11 @@ # Copyright (c) 2024, NVIDIA CORPORATION. +import functools from dask_expr._cumulative import CumulativeBlockwise -from dask_expr._reductions import Var +from dask_expr._expr import Expr, VarColumns +from dask_expr._reductions import Reduction, Var + +from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty ## ## Custom expression patching @@ -25,19 +29,95 @@ def _kwargs(self) -> dict: CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs -# This patch accounts for differences between -# numpy and cupy behavior. It may make sense -# to move this logic upstream. -_dx_reduction_aggregate = Var.reduction_aggregate +# The upstream Var code uses `Series.values`, and relies on numpy +# for most of the logic. Unfortunately, cudf -> cupy conversion +# is not supported for data containing null values. Therefore, +# we must implement our own version of Var for now. This logic +# is mostly copied from dask-cudf. + + +class VarCudf(Reduction): + # Uses the parallel version of Welford's online algorithm (Chan 79') + # (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf) + _parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"] + _defaults = { + "skipna": True, + "ddof": 1, + "numeric_only": False, + "split_every": False, + } + + @functools.cached_property + def _meta(self): + return make_meta( + meta_nonempty(self.frame._meta).var( + skipna=self.skipna, numeric_only=self.numeric_only + ) + ) + + @property + def chunk_kwargs(self): + return dict(skipna=self.skipna, numeric_only=self.numeric_only) + + @property + def combine_kwargs(self): + return {} + + @property + def aggregate_kwargs(self): + return dict(ddof=self.ddof) + + @classmethod + def reduction_chunk(cls, x, skipna=True, numeric_only=False): + kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {} + if skipna or numeric_only: + n = x.count(**kwargs) + kwargs["skipna"] = skipna + avg = x.mean(**kwargs) + else: + # Not skipping nulls, so might as well + # avoid the full `count` operation + n = len(x) + kwargs["skipna"] = skipna + avg = x.sum(**kwargs) / n + if numeric_only: + # Workaround for cudf bug + # (see: https://github.com/rapidsai/cudf/issues/13731) + x = x[n.index] + m2 = ((x - avg) ** 2).sum(**kwargs) + return n, avg, m2 + + @classmethod + def reduction_combine(cls, parts): + n, avg, m2 = parts[0] + for i in range(1, len(parts)): + n_a, avg_a, m2_a = n, avg, m2 + n_b, avg_b, m2_b = parts[i] + n = n_a + n_b + avg = (n_a * avg_a + n_b * avg_b) / n + delta = avg_b - avg_a + m2 = m2_a + m2_b + delta**2 * n_a * n_b / n + return n, avg, m2 + + @classmethod + def reduction_aggregate(cls, vals, ddof=1): + vals = cls.reduction_combine(vals) + n, _, m2 = vals + return m2 / (n - ddof) -def _reduction_aggregate(*args, **kwargs): - result = _dx_reduction_aggregate(*args, **kwargs) - if result.ndim == 0: - # cupy will sometimes produce a 0d array, and - # we need to convert it to a scalar. - return result.item() - return result +def _patched_var( + self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False +): + if axis == 0: + if hasattr(self._meta, "to_pandas"): + return VarCudf(self, skipna, ddof, numeric_only, split_every) + else: + return Var(self, skipna, ddof, numeric_only, split_every) + elif axis == 1: + return VarColumns(self, skipna, ddof, numeric_only) + else: + raise ValueError(f"axis={axis} not supported. Please specify 0 or 1") -Var.reduction_aggregate = staticmethod(_reduction_aggregate) +Expr.var = _patched_var From 95f433b10c0caf38aa6f384db4101675df54de75 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Mar 2024 20:13:26 -0700 Subject: [PATCH 2/7] revert doc-test change --- ci/build_docs.sh | 3 --- 1 file changed, 3 deletions(-) diff --git a/ci/build_docs.sh b/ci/build_docs.sh index fc02fe7548c..8e22f02b484 100755 --- a/ci/build_docs.sh +++ b/ci/build_docs.sh @@ -41,9 +41,6 @@ mkdir -p "${RAPIDS_DOCS_DIR}/libcudf/html" mv html/* "${RAPIDS_DOCS_DIR}/libcudf/html" popd -# TODO: Remove this once dask-expr works in the 10min notebook -export DASK_DATAFRAME__QUERY_PLANNING=False - rapids-logger "Build Python docs" pushd docs/cudf make dirhtml From 2e506fa4eee78576ba4c25f9e1cda25464c811f4 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 19 Mar 2024 20:16:29 -0700 Subject: [PATCH 3/7] roll back shuffle change for now --- python/dask_cudf/dask_cudf/expr/__init__.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/__init__.py b/python/dask_cudf/dask_cudf/expr/__init__.py index dfc86cccc12..826f514a674 100644 --- a/python/dask_cudf/dask_cudf/expr/__init__.py +++ b/python/dask_cudf/dask_cudf/expr/__init__.py @@ -6,12 +6,6 @@ # For dask>=2024.3.0, a null value will default to True QUERY_PLANNING_ON = config.get("dataframe.query-planning", None) is not False - -# Set shuffle default to "tasks" -if config.get("dataframe.shuffle.method", None) is None: - config.set({"dataframe.shuffle.method", "tasks"}) - - # Register custom expressions and collections if QUERY_PLANNING_ON: try: From 302b8dc2fd2ead7f3b9f4afa94c673a920c4104e Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 11:15:17 -0700 Subject: [PATCH 4/7] workaround graph-materialization bug --- python/dask_cudf/dask_cudf/expr/_expr.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index e99c89d914e..858a3e2d33b 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -87,6 +87,12 @@ def reduction_chunk(cls, x, skipna=True, numeric_only=False): m2 = ((x - avg) ** 2).sum(**kwargs) return n, avg, m2 + def _layer(self): + # Workaround for https://github.com/dask/dask/pull/11013 + # This is technically an abstract expression - We should + # not need to define a `_layer` method. + return {} + @classmethod def reduction_combine(cls, parts): n, avg, m2 = parts[0] From 117e070caecadb6d4d49d5fc5d839820ec0fd66d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Wed, 20 Mar 2024 11:27:54 -0700 Subject: [PATCH 5/7] update test --- python/dask_cudf/dask_cudf/tests/test_reductions.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/python/dask_cudf/dask_cudf/tests/test_reductions.py b/python/dask_cudf/dask_cudf/tests/test_reductions.py index c3056f2607c..88b15718382 100644 --- a/python/dask_cudf/dask_cudf/tests/test_reductions.py +++ b/python/dask_cudf/dask_cudf/tests/test_reductions.py @@ -84,3 +84,13 @@ def test_rowwise_reductions(data, op): check_exact=False, check_dtype=op not in ("var", "std"), ) + + +@pytest.mark.parametrize("skipna", [True, False]) +def test_var_nulls(skipna): + # Copied from 10min example notebook + # See: https://github.com/rapidsai/cudf/pull/15347 + s = cudf.Series([1, 2, 3, None, 4]) + ds = dask_cudf.from_cudf(s, npartitions=2) + dd.assert_eq(s.var(skipna=skipna), ds.var(skipna=skipna)) + dd.assert_eq(s.std(skipna=skipna), ds.std(skipna=skipna)) From 25b6a8d1864165cceb352a0d35eb9649252fc516 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 2 Apr 2024 10:34:42 -0700 Subject: [PATCH 6/7] remove _layer workaround --- python/dask_cudf/dask_cudf/expr/_expr.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 858a3e2d33b..e99c89d914e 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -87,12 +87,6 @@ def reduction_chunk(cls, x, skipna=True, numeric_only=False): m2 = ((x - avg) ** 2).sum(**kwargs) return n, avg, m2 - def _layer(self): - # Workaround for https://github.com/dask/dask/pull/11013 - # This is technically an abstract expression - We should - # not need to define a `_layer` method. - return {} - @classmethod def reduction_combine(cls, parts): n, avg, m2 = parts[0] From cf1206c55aede6c4534a2928c2d42a19b5bfe27a Mon Sep 17 00:00:00 2001 From: "Richard (Rick) Zamora" Date: Wed, 3 Apr 2024 18:07:17 -0500 Subject: [PATCH 7/7] Update python/dask_cudf/dask_cudf/expr/_expr.py Co-authored-by: Bradley Dice --- python/dask_cudf/dask_cudf/expr/_expr.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index e99c89d914e..ff037b9520c 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -37,7 +37,7 @@ def _kwargs(self) -> dict: class VarCudf(Reduction): - # Uses the parallel version of Welford's online algorithm (Chan 79') + # Uses the parallel version of Welford's online algorithm (Chan '79) # (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf) _parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"] _defaults = {