From f8c437d9754693fc4cd0e0c291d3ff746d7c9341 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 2 May 2024 12:01:11 -0700 Subject: [PATCH 1/4] add custom RepartitionQuantiles graph --- python/dask_cudf/dask_cudf/expr/_expr.py | 52 +++++++++++++++++++ python/dask_cudf/dask_cudf/tests/test_sort.py | 1 - 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index ff037b9520c..c3cdb8766ae 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,8 +1,10 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import functools +import numpy as np from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Expr, VarColumns +from dask_expr._quantiles import RepartitionQuantiles from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty @@ -121,3 +123,53 @@ def _patched_var( Expr.var = _patched_var + + +# Add custom code path for RepartitionQuantiles +# (Upstream logic fails when null values are present) + + +def _quantile(a, q): + a = a.to_frame() if a.ndim == 1 else a + n = len(a) + if not len(a): + return None, n + return ( + a.quantile(q=q.tolist(), interpolation="nearest", method="table"), + n, + ) + + +def merge_quantiles(finalq, qs, vals): + from dask_cudf.sorting import merge_quantiles as mq + + result = mq(finalq, qs, vals) + return result.iloc[:, 0].to_pandas() + + +_original_layer = RepartitionQuantiles._layer + + +def _cudf_layer(self): + if hasattr(self._meta, "to_pandas"): + # pandas/cudf uses quantile in [0, 1] + # numpy / cupy uses [0, 100] + qs = np.linspace(0.0, 1.0, self.input_npartitions + 1) + val_dsk = { + (self._name, 0, i): (_quantile, key, qs) + for i, key in enumerate(self.frame.__dask_keys__()) + } + merge_dsk = { + (self._name, 0): ( + merge_quantiles, + qs, + [qs] * self.input_npartitions, + sorted(val_dsk), + ) + } + return {**val_dsk, **merge_dsk} + else: + return _original_layer(self) + + +RepartitionQuantiles._layer = _cudf_layer diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 9184ad996ad..c0222320ca0 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -72,7 +72,6 @@ def test_sort_repartition(): dd.assert_eq(len(new_ddf), len(ddf)) -@xfail_dask_expr("dask-expr code path fails with nulls") @pytest.mark.parametrize("na_position", ["first", "last"]) @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) From d9c75b2ae27a8b84467476d1611971c1ffc3e095 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 2 May 2024 12:07:23 -0700 Subject: [PATCH 2/4] remove some fluff --- python/dask_cudf/dask_cudf/expr/_expr.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index c3cdb8766ae..bf69f775bad 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -131,20 +131,16 @@ def _patched_var( def _quantile(a, q): a = a.to_frame() if a.ndim == 1 else a - n = len(a) - if not len(a): - return None, n return ( a.quantile(q=q.tolist(), interpolation="nearest", method="table"), - n, + len(a), ) def merge_quantiles(finalq, qs, vals): from dask_cudf.sorting import merge_quantiles as mq - result = mq(finalq, qs, vals) - return result.iloc[:, 0].to_pandas() + return mq(finalq, qs, vals).iloc[:, 0].to_pandas() _original_layer = RepartitionQuantiles._layer From 98cbbac79d2d00ca073612f789a01316061cfa3d Mon Sep 17 00:00:00 2001 From: rjzamora Date: Thu, 2 May 2024 15:17:32 -0700 Subject: [PATCH 3/4] handle empty data --- python/dask_cudf/dask_cudf/expr/_expr.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index bf69f775bad..5e5f8cb323a 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -125,11 +125,16 @@ def _patched_var( Expr.var = _patched_var -# Add custom code path for RepartitionQuantiles -# (Upstream logic fails when null values are present) +# Add custom code path for RepartitionQuantiles, because +# upstream logic fails when null values are present. Note +# that the cudf-specific code path can also be used for +# multi-column divisions in the future. def _quantile(a, q): + if a.empty: + # Avoid calling `quantile` on empty data + return None, 0 a = a.to_frame() if a.ndim == 1 else a return ( a.quantile(q=q.tolist(), interpolation="nearest", method="table"), From 99e1db52b15aa032633bdcd0a9bc8a89782c6137 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Mon, 6 May 2024 07:05:19 -0700 Subject: [PATCH 4/4] avoid RepartitionQuantiles patching --- python/dask_cudf/dask_cudf/backends.py | 2 +- python/dask_cudf/dask_cudf/expr/_expr.py | 53 ------------------- python/dask_cudf/dask_cudf/tests/test_sort.py | 1 + 3 files changed, 2 insertions(+), 54 deletions(-) diff --git a/python/dask_cudf/dask_cudf/backends.py b/python/dask_cudf/dask_cudf/backends.py index 94528325aea..d250589e389 100644 --- a/python/dask_cudf/dask_cudf/backends.py +++ b/python/dask_cudf/dask_cudf/backends.py @@ -307,7 +307,7 @@ def categorical_dtype_cudf(categories=None, ordered=False): @tolist_dispatch.register((cudf.Series, cudf.BaseIndex)) @_dask_cudf_nvtx_annotate def tolist_cudf(obj): - return obj.to_arrow().to_pylist() + return obj.to_pandas().tolist() @is_categorical_dtype_dispatch.register( diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index 5e5f8cb323a..ff037b9520c 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,10 +1,8 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import functools -import numpy as np from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Expr, VarColumns -from dask_expr._quantiles import RepartitionQuantiles from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty @@ -123,54 +121,3 @@ def _patched_var( Expr.var = _patched_var - - -# Add custom code path for RepartitionQuantiles, because -# upstream logic fails when null values are present. Note -# that the cudf-specific code path can also be used for -# multi-column divisions in the future. - - -def _quantile(a, q): - if a.empty: - # Avoid calling `quantile` on empty data - return None, 0 - a = a.to_frame() if a.ndim == 1 else a - return ( - a.quantile(q=q.tolist(), interpolation="nearest", method="table"), - len(a), - ) - - -def merge_quantiles(finalq, qs, vals): - from dask_cudf.sorting import merge_quantiles as mq - - return mq(finalq, qs, vals).iloc[:, 0].to_pandas() - - -_original_layer = RepartitionQuantiles._layer - - -def _cudf_layer(self): - if hasattr(self._meta, "to_pandas"): - # pandas/cudf uses quantile in [0, 1] - # numpy / cupy uses [0, 100] - qs = np.linspace(0.0, 1.0, self.input_npartitions + 1) - val_dsk = { - (self._name, 0, i): (_quantile, key, qs) - for i, key in enumerate(self.frame.__dask_keys__()) - } - merge_dsk = { - (self._name, 0): ( - merge_quantiles, - qs, - [qs] * self.input_npartitions, - sorted(val_dsk), - ) - } - return {**val_dsk, **merge_dsk} - else: - return _original_layer(self) - - -RepartitionQuantiles._layer = _cudf_layer diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index c0222320ca0..400600a1598 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -72,6 +72,7 @@ def test_sort_repartition(): dd.assert_eq(len(new_ddf), len(ddf)) +@xfail_dask_expr("missing null support", lt_version="2024.5.1") @pytest.mark.parametrize("na_position", ["first", "last"]) @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]])