From ed267b05761263505119b7e84c4322fb95624ce4 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Tue, 25 Jun 2024 22:03:13 +0000 Subject: [PATCH 1/4] Support quantile in cudf_polars [skip ci] --- python/cudf_polars/cudf_polars/dsl/expr.py | 35 +++++++++++++++++-- .../cudf_polars/tests/expressions/test_agg.py | 16 +++++++++ 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 9318934cb78..ca51dc24175 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -1438,6 +1438,9 @@ def __init__( req = plc.aggregation.variance(ddof=options) elif name == "count": req = plc.aggregation.count(null_handling=plc.types.NullPolicy.EXCLUDE) + elif name == "quantile": + # req = plc.aggregation.quantile(interp=interp_mapping[interp]) + req = None else: raise NotImplementedError( f"Unreachable, {name=} is incorrectly listed in _SUPPORTED" @@ -1446,6 +1449,8 @@ def __init__( op = getattr(self, f"_{name}", None) if op is None: op = partial(self._reduce, request=req) + elif name == "quantile": + op = partial(op, interp=options) elif name in {"min", "max"}: op = partial(op, propagate_nans=options) elif name in {"count", "first", "last"}: @@ -1469,6 +1474,7 @@ def __init__( "count", "std", "var", + "quantile", ] ) @@ -1551,6 +1557,27 @@ def _last(self, column: Column) -> Column: n = column.obj.size() return Column(plc.copying.slice(column.obj, [n - 1, n])[0]) + def _quantile(self, column: Column, quantile: Column, *, interp: str) -> Column: + interp_mapping = { + "nearest": plc.types.Interpolation.NEAREST, + "higher": plc.types.Interpolation.HIGHER, + "lower": plc.types.Interpolation.LOWER, + "midpoint": plc.types.Interpolation.MIDPOINT, + "linear": plc.types.Interpolation.LINEAR, + } + if not quantile.is_scalar: + raise ValueError( + "cudf-polars only supports expressions that evaluate to a scalar as the quantile argument" + ) + return self._reduce( + column, + request=plc.aggregation.quantile( + # TODO: eww! accept pylibcudf Scalar in quantiles? + quantiles=[plc.interop.to_arrow(quantile.obj_scalar).as_py()], + interp=interp_mapping[interp], + ), + ) + def do_evaluate( self, df: DataFrame, @@ -1563,8 +1590,12 @@ def do_evaluate( raise NotImplementedError( f"Agg in context {context}" ) # pragma: no cover; unreachable - (child,) = self.children - return self.op(child.evaluate(df, context=context, mapping=mapping)) + return self.op( + *( + child.evaluate(df, context=context, mapping=mapping) + for child in self.children + ) + ) class Ternary(Expr): diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index f7b32e52082..38a796d8eb7 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -69,6 +69,8 @@ def df(dtype, with_nulls, is_sorted): def test_agg(df, agg): + if agg == "quantile": + pytest.skip("quantile takes in an extra arg and is tested separately") expr = getattr(pl.col("a"), agg)() q = df.select(expr) @@ -105,6 +107,20 @@ def test_cum_agg_reverse_unsupported(cum_agg): assert_ir_translation_raises(q, NotImplementedError) +@pytest.mark.parametrize("q", [0.5, pl.lit(0.5)]) +@pytest.mark.parametrize("interp", ["nearest", "higher", "lower", "midpoint", "linear"]) +def test_quantile(df, q, interp): + expr = pl.col("a").quantile(q, interp) + q = df.select(expr) + + # https://github.com/rapidsai/cudf/issues/15852 + check_dtypes = q.collect_schema()["a"] == pl.Float64 + if not check_dtypes: + with pytest.raises(AssertionError): + assert_gpu_result_equal(q) + assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False) + + @pytest.mark.parametrize( "op", [pl.Expr.min, pl.Expr.nan_min, pl.Expr.max, pl.Expr.nan_max] ) From a48d98f72e4c94a20b3f5cef728ad0e69bfcba46 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Mon, 1 Jul 2024 23:56:15 +0000 Subject: [PATCH 2/4] wip --- python/cudf_polars/cudf_polars/dsl/expr.py | 1 - .../cudf_polars/tests/expressions/test_agg.py | 23 +++++++++++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index ca51dc24175..d0639885f75 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -1439,7 +1439,6 @@ def __init__( elif name == "count": req = plc.aggregation.count(null_handling=plc.types.NullPolicy.EXCLUDE) elif name == "quantile": - # req = plc.aggregation.quantile(interp=interp_mapping[interp]) req = None else: raise NotImplementedError( diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index 38a796d8eb7..2845389aa32 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -11,6 +11,19 @@ assert_gpu_result_equal, assert_ir_translation_raises, ) +from cudf_polars.callback import execute_with_cudf + + +# Note: quantile is tested separately (since it takes another argument) +@pytest.fixture(params=sorted(expr.Agg._SUPPORTED - {"quantile"})) +def agg(request): + return request.param + + +@pytest.fixture(params=[pl.Int32, pl.Float32, pl.Int16]) +def dtype(request): + return request.param + @pytest.fixture( @@ -69,8 +82,6 @@ def df(dtype, with_nulls, is_sorted): def test_agg(df, agg): - if agg == "quantile": - pytest.skip("quantile takes in an extra arg and is tested separately") expr = getattr(pl.col("a"), agg)() q = df.select(expr) @@ -121,6 +132,14 @@ def test_quantile(df, q, interp): assert_gpu_result_equal(q, check_dtypes=check_dtypes, check_exact=False) +def test_quantile_invalid_q(df): + expr = pl.col("a").quantile(pl.col("a")) + q = df.select(expr) + with pytest.raises(pl.exceptions.ComputeError, match="cudf-polars only supports expressions that evaluate to a scalar as the quantile argument"): + q.collect(post_opt_callback=execute_with_cudf) + + + @pytest.mark.parametrize( "op", [pl.Expr.min, pl.Expr.nan_min, pl.Expr.max, pl.Expr.nan_max] ) From 820190d9a9f1c15d29449d7be1b842062f823b03 Mon Sep 17 00:00:00 2001 From: Thomas Li Date: Wed, 3 Jul 2024 23:43:12 +0000 Subject: [PATCH 3/4] address more feedback --- python/cudf_polars/cudf_polars/dsl/expr.py | 50 ++++++++++++------- .../cudf_polars/tests/expressions/test_agg.py | 7 ++- 2 files changed, 38 insertions(+), 19 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index d0639885f75..4f138375040 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -1477,6 +1477,14 @@ def __init__( ] ) + interp_mapping: ClassVar[dict[str, plc.types.Interpolation]] = { + "nearest": plc.types.Interpolation.NEAREST, + "higher": plc.types.Interpolation.HIGHER, + "lower": plc.types.Interpolation.LOWER, + "midpoint": plc.types.Interpolation.MIDPOINT, + "linear": plc.types.Interpolation.LINEAR, + } + def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" if depth >= 1: @@ -1556,24 +1564,19 @@ def _last(self, column: Column) -> Column: n = column.obj.size() return Column(plc.copying.slice(column.obj, [n - 1, n])[0]) - def _quantile(self, column: Column, quantile: Column, *, interp: str) -> Column: - interp_mapping = { - "nearest": plc.types.Interpolation.NEAREST, - "higher": plc.types.Interpolation.HIGHER, - "lower": plc.types.Interpolation.LOWER, - "midpoint": plc.types.Interpolation.MIDPOINT, - "linear": plc.types.Interpolation.LINEAR, - } - if not quantile.is_scalar: - raise ValueError( + def _quantile( + self, column: Column, quantile: Column | pa.Scalar, *, interp: str + ) -> Column: + # do_evaluate should have evaluated our quantile Column to a pyarrow scalar + if not isinstance(quantile, pa.Scalar): + raise ValueError( # noqa: TRY004 "cudf-polars only supports expressions that evaluate to a scalar as the quantile argument" ) return self._reduce( column, request=plc.aggregation.quantile( - # TODO: eww! accept pylibcudf Scalar in quantiles? - quantiles=[plc.interop.to_arrow(quantile.obj_scalar).as_py()], - interp=interp_mapping[interp], + quantiles=[quantile.as_py()], + interp=Agg.interp_mapping[interp], ), ) @@ -1589,12 +1592,25 @@ def do_evaluate( raise NotImplementedError( f"Agg in context {context}" ) # pragma: no cover; unreachable - return self.op( - *( + + # Don't convert scalar literals to pylibcudf column + # for quantile (AggInfo takes in Python scalars not pylibcudf ones) + if self.name == "quantile": + evaled_children = [] + for child in self.children: + if isinstance(child, Literal): + evaled_children.append(child.value) + else: + evaled_children.append( + child.evaluate(df, context=context, mapping=mapping) + ) + else: + evaled_children = [ child.evaluate(df, context=context, mapping=mapping) for child in self.children - ) - ) + ] + + return self.op(*evaled_children) class Ternary(Expr): diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index 2845389aa32..337ef952ebf 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -6,6 +6,7 @@ import polars as pl +from cudf_polars.callback import execute_with_cudf from cudf_polars.dsl import expr from cudf_polars.testing.asserts import ( assert_gpu_result_equal, @@ -135,11 +136,13 @@ def test_quantile(df, q, interp): def test_quantile_invalid_q(df): expr = pl.col("a").quantile(pl.col("a")) q = df.select(expr) - with pytest.raises(pl.exceptions.ComputeError, match="cudf-polars only supports expressions that evaluate to a scalar as the quantile argument"): + with pytest.raises( + pl.exceptions.ComputeError, + match="cudf-polars only supports expressions that evaluate to a scalar as the quantile argument", + ): q.collect(post_opt_callback=execute_with_cudf) - @pytest.mark.parametrize( "op", [pl.Expr.min, pl.Expr.nan_min, pl.Expr.max, pl.Expr.nan_max] ) From 3526ba431561f754d92ebb5b196d1c8c44335357 Mon Sep 17 00:00:00 2001 From: Vyas Ramasubramani Date: Wed, 28 Aug 2024 18:16:20 +0000 Subject: [PATCH 4/4] Address PR feedback --- python/cudf_polars/cudf_polars/dsl/expr.py | 47 ++++--------------- .../cudf_polars/tests/expressions/test_agg.py | 20 +------- 2 files changed, 11 insertions(+), 56 deletions(-) diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 4f138375040..4803d0e8a1c 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -1439,7 +1439,12 @@ def __init__( elif name == "count": req = plc.aggregation.count(null_handling=plc.types.NullPolicy.EXCLUDE) elif name == "quantile": - req = None + _, quantile = self.children + if not isinstance(quantile, Literal): + raise NotImplementedError("Only support literal quantile values") + req = plc.aggregation.quantile( + quantiles=[quantile.value.as_py()], interp=Agg.interp_mapping[options] + ) else: raise NotImplementedError( f"Unreachable, {name=} is incorrectly listed in _SUPPORTED" @@ -1448,8 +1453,6 @@ def __init__( op = getattr(self, f"_{name}", None) if op is None: op = partial(self._reduce, request=req) - elif name == "quantile": - op = partial(op, interp=options) elif name in {"min", "max"}: op = partial(op, propagate_nans=options) elif name in {"count", "first", "last"}: @@ -1564,22 +1567,6 @@ def _last(self, column: Column) -> Column: n = column.obj.size() return Column(plc.copying.slice(column.obj, [n - 1, n])[0]) - def _quantile( - self, column: Column, quantile: Column | pa.Scalar, *, interp: str - ) -> Column: - # do_evaluate should have evaluated our quantile Column to a pyarrow scalar - if not isinstance(quantile, pa.Scalar): - raise ValueError( # noqa: TRY004 - "cudf-polars only supports expressions that evaluate to a scalar as the quantile argument" - ) - return self._reduce( - column, - request=plc.aggregation.quantile( - quantiles=[quantile.as_py()], - interp=Agg.interp_mapping[interp], - ), - ) - def do_evaluate( self, df: DataFrame, @@ -1593,24 +1580,10 @@ def do_evaluate( f"Agg in context {context}" ) # pragma: no cover; unreachable - # Don't convert scalar literals to pylibcudf column - # for quantile (AggInfo takes in Python scalars not pylibcudf ones) - if self.name == "quantile": - evaled_children = [] - for child in self.children: - if isinstance(child, Literal): - evaled_children.append(child.value) - else: - evaled_children.append( - child.evaluate(df, context=context, mapping=mapping) - ) - else: - evaled_children = [ - child.evaluate(df, context=context, mapping=mapping) - for child in self.children - ] - - return self.op(*evaled_children) + # Aggregations like quantiles may have additional children that were + # preprocessed into pylibcudf requests. + child = self.children[0] + return self.op(child.evaluate(df, context=context, mapping=mapping)) class Ternary(Expr): diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index 337ef952ebf..56055f4c6c2 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -6,25 +6,11 @@ import polars as pl -from cudf_polars.callback import execute_with_cudf from cudf_polars.dsl import expr from cudf_polars.testing.asserts import ( assert_gpu_result_equal, assert_ir_translation_raises, ) -from cudf_polars.callback import execute_with_cudf - - -# Note: quantile is tested separately (since it takes another argument) -@pytest.fixture(params=sorted(expr.Agg._SUPPORTED - {"quantile"})) -def agg(request): - return request.param - - -@pytest.fixture(params=[pl.Int32, pl.Float32, pl.Int16]) -def dtype(request): - return request.param - @pytest.fixture( @@ -136,11 +122,7 @@ def test_quantile(df, q, interp): def test_quantile_invalid_q(df): expr = pl.col("a").quantile(pl.col("a")) q = df.select(expr) - with pytest.raises( - pl.exceptions.ComputeError, - match="cudf-polars only supports expressions that evaluate to a scalar as the quantile argument", - ): - q.collect(post_opt_callback=execute_with_cudf) + assert_ir_translation_raises(q, NotImplementedError) @pytest.mark.parametrize(