From 98319b178c0d9dae832cd168ccce2b14c76795b7 Mon Sep 17 00:00:00 2001 From: rjzamora Date: Tue, 21 May 2024 07:12:20 -0700 Subject: [PATCH] add temporary workaround to support sorting by categorical columns --- .../dask_cudf/dask_cudf/expr/_collection.py | 19 -------------- python/dask_cudf/dask_cudf/expr/_expr.py | 25 +++++++++++++++++++ python/dask_cudf/dask_cudf/tests/test_sort.py | 23 ++--------------- 3 files changed, 27 insertions(+), 40 deletions(-) diff --git a/python/dask_cudf/dask_cudf/expr/_collection.py b/python/dask_cudf/dask_cudf/expr/_collection.py index 926b7cfaf0e..d50dfb24256 100644 --- a/python/dask_cudf/dask_cudf/expr/_collection.py +++ b/python/dask_cudf/dask_cudf/expr/_collection.py @@ -15,7 +15,6 @@ from dask import config from dask.dataframe.core import is_dataframe_like -from dask.dataframe.dispatch import is_categorical_dtype import cudf @@ -82,24 +81,6 @@ def from_dict(cls, *args, **kwargs): with config.set({"dataframe.backend": "cudf"}): return DXDataFrame.from_dict(*args, **kwargs) - def sort_values( - self, - by, - **kwargs, - ): - # Raise if the first column is categorical, otherwise the - # upstream divisions logic may produce errors - # (See: https://github.com/rapidsai/cudf/issues/11795) - check_by = by[0] if isinstance(by, list) else by - if is_categorical_dtype(self.dtypes.get(check_by, None)): - raise NotImplementedError( - "Dask-cudf does not support sorting on categorical " - "columns when query-planning is enabled. Please use " - "the legacy API for now." - f"\n{_LEGACY_WORKAROUND}", - ) - return super().sort_values(by, **kwargs) - def groupby( self, by, diff --git a/python/dask_cudf/dask_cudf/expr/_expr.py b/python/dask_cudf/dask_cudf/expr/_expr.py index ff037b9520c..8fccaccb695 100644 --- a/python/dask_cudf/dask_cudf/expr/_expr.py +++ b/python/dask_cudf/dask_cudf/expr/_expr.py @@ -1,11 +1,14 @@ # Copyright (c) 2024, NVIDIA CORPORATION. import functools +import dask_expr._shuffle as _shuffle_module +from dask_expr import new_collection from dask_expr._cumulative import CumulativeBlockwise from dask_expr._expr import Expr, VarColumns from dask_expr._reductions import Reduction, Var from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty +from dask.dataframe.dispatch import is_categorical_dtype ## ## Custom expression patching @@ -121,3 +124,25 @@ def _patched_var( Expr.var = _patched_var + + +# Temporary work-around for missing cudf + categorical support +# See: https://github.com/rapidsai/cudf/issues/11795 +# TODO: Fix RepartitionQuantiles and remove this in cudf>24.06 + +_original_get_divisions = _shuffle_module._get_divisions + + +def _patched_get_divisions(frame, other, *args, **kwargs): + # NOTE: The following two lines contains the "patch" + # (we simply convert the partitioning column to pandas) + if is_categorical_dtype(other._meta.dtype) and hasattr( + other.frame._meta, "to_pandas" + ): + other = new_collection(other).to_backend("pandas")._expr + + # Call "original" function + return _original_get_divisions(frame, other, *args, **kwargs) + + +_shuffle_module._get_divisions = _patched_get_divisions diff --git a/python/dask_cudf/dask_cudf/tests/test_sort.py b/python/dask_cudf/dask_cudf/tests/test_sort.py index 9d9fe297248..9bbbbc79561 100644 --- a/python/dask_cudf/dask_cudf/tests/test_sort.py +++ b/python/dask_cudf/dask_cudf/tests/test_sort.py @@ -10,7 +10,7 @@ import cudf import dask_cudf -from dask_cudf.tests.utils import QUERY_PLANNING_ON, xfail_dask_expr +from dask_cudf.tests.utils import xfail_dask_expr @pytest.mark.parametrize("ascending", [True, False]) @@ -20,12 +20,7 @@ "a", "b", "c", - pytest.param( - "d", - marks=xfail_dask_expr( - "Possible segfault when sorting by categorical column.", - ), - ), + "d", ["a", "b"], ["c", "d"], ], @@ -47,20 +42,6 @@ def test_sort_values(nelem, nparts, by, ascending): dd.assert_eq(got, expect, check_index=False) -@pytest.mark.parametrize("by", ["b", ["b", "a"]]) -def test_sort_values_categorical_raises(by): - df = cudf.DataFrame() - df["a"] = np.ascontiguousarray(np.arange(10)[::-1]) - df["b"] = df["a"].astype("category") - ddf = dd.from_pandas(df, npartitions=10) - - if QUERY_PLANNING_ON: - with pytest.raises( - NotImplementedError, match="sorting on categorical" - ): - ddf.sort_values(by=by) - - @pytest.mark.parametrize("ascending", [True, False]) @pytest.mark.parametrize("by", ["a", "b", ["a", "b"]]) def test_sort_values_single_partition(by, ascending):