diff --git a/python/cudf/cudf/_lib/pylibcudf/groupby.pxd b/python/cudf/cudf/_lib/pylibcudf/groupby.pxd index c6c146b0445..eaa05c26986 100644 --- a/python/cudf/cudf/_lib/pylibcudf/groupby.pxd +++ b/python/cudf/cudf/_lib/pylibcudf/groupby.pxd @@ -16,6 +16,7 @@ from cudf._lib.pylibcudf.libcudf.groupby cimport ( scan_request, ) from cudf._lib.pylibcudf.libcudf.table.table cimport table +from cudf._lib.pylibcudf.libcudf.types cimport null_order, order from .column cimport Column from .table cimport Table @@ -38,6 +39,9 @@ cdef class GroupByRequest: cdef class GroupBy: cdef unique_ptr[groupby] c_obj cdef Table _keys + cdef unique_ptr[vector[order]] _column_order + cdef unique_ptr[vector[null_order]] _null_precedence + cpdef tuple aggregate(self, list requests) cpdef tuple scan(self, list requests) cpdef tuple shift(self, Table values, list offset, list fill_values) diff --git a/python/cudf/cudf/_lib/pylibcudf/groupby.pyx b/python/cudf/cudf/_lib/pylibcudf/groupby.pyx index 46fe61025ce..f5bb46ca6a2 100644 --- a/python/cudf/cudf/_lib/pylibcudf/groupby.pyx +++ b/python/cudf/cudf/_lib/pylibcudf/groupby.pyx @@ -2,7 +2,7 @@ from cython.operator cimport dereference from libcpp.functional cimport reference_wrapper -from libcpp.memory cimport unique_ptr +from libcpp.memory cimport make_unique, unique_ptr from libcpp.pair cimport pair from libcpp.utility cimport move from libcpp.vector cimport vector @@ -22,7 +22,7 @@ from cudf._lib.pylibcudf.libcudf.types cimport size_type from .aggregation cimport Aggregation from .column cimport Column from .table cimport Table -from .types cimport null_policy, sorted +from .types cimport null_order, null_policy, order, sorted from .utils cimport _as_vector @@ -87,17 +87,43 @@ cdef class GroupBy: keys : Table The columns to group by. null_handling : null_policy, optional - Whether or not to include null rows in ``keys``. Default is null_policy.EXCLUDE. + Whether or not to include null rows in `keys`. + Default is ``null_policy.EXCLUDE``. keys_are_sorted : sorted, optional - Whether the keys are already sorted. Default is sorted.NO. + Whether the keys are already sorted. Default is ``sorted.NO``. + column_order : list[order] + Indicates the order of each column. Default is ``order.ASCENDING``. + Ignored if `keys_are_sorted` is ``sorted.NO``. + null_precedence : list[null_order] + Indicates the ordering of null values in each column. + Default is ``null_order.AFTER``. Ignored if `keys_are_sorted` is ``sorted.NO``. """ def __init__( self, Table keys, null_policy null_handling=null_policy.EXCLUDE, - sorted keys_are_sorted=sorted.NO + sorted keys_are_sorted=sorted.NO, + list column_order=None, + list null_precedence=None, ): - self.c_obj.reset(new groupby(keys.view(), null_handling, keys_are_sorted)) + self._column_order = make_unique[vector[order]]() + self._null_precedence = make_unique[vector[null_order]]() + if column_order is not None: + for o in column_order: + dereference(self._column_order).push_back(o) + if null_precedence is not None: + for o in null_precedence: + dereference(self._null_precedence).push_back(o) + + self.c_obj.reset( + new groupby( + keys.view(), + null_handling, + keys_are_sorted, + dereference(self._column_order.get()), + dereference(self._null_precedence.get()), + ) + ) # keep a reference to the keys table so it doesn't get # deallocated from under us: self._keys = keys diff --git a/python/cudf_polars/cudf_polars/containers/column.py b/python/cudf_polars/cudf_polars/containers/column.py index af67059844e..42aba0fcdc0 100644 --- a/python/cudf_polars/cudf_polars/containers/column.py +++ b/python/cudf_polars/cudf_polars/containers/column.py @@ -13,6 +13,8 @@ if TYPE_CHECKING: from typing_extensions import Self + import polars as pl + __all__: list[str] = ["Column", "NamedColumn"] @@ -76,12 +78,49 @@ def sorted_like(self, like: Column, /) -> Self: See Also -------- - set_sorted + set_sorted, copy_metadata """ return self.set_sorted( is_sorted=like.is_sorted, order=like.order, null_order=like.null_order ) + def copy_metadata(self, from_: pl.Series, /) -> Self: + """ + Copy metadata from a host series onto self. + + Parameters + ---------- + from_ + Polars series to copy metadata from + + Returns + ------- + Self with metadata set. + + See Also + -------- + set_sorted, sorted_like + """ + if len(from_) <= 1: + return self + ascending = from_.flags["SORTED_ASC"] + descending = from_.flags["SORTED_DESC"] + if ascending or descending: + has_null_first = from_.item(0) is None + has_null_last = from_.item(-1) is None + order = ( + plc.types.Order.ASCENDING if ascending else plc.types.Order.DESCENDING + ) + null_order = plc.types.NullOrder.BEFORE + if (descending and has_null_first) or (ascending and has_null_last): + null_order = plc.types.NullOrder.AFTER + return self.set_sorted( + is_sorted=plc.types.Sorted.YES, + order=order, + null_order=null_order, + ) + return self + def set_sorted( self, *, diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index d86656578d7..cbeadf1426a 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -9,16 +9,18 @@ from functools import cached_property from typing import TYPE_CHECKING, cast +import pyarrow as pa + import polars as pl import cudf._lib.pylibcudf as plc from cudf_polars.containers.column import NamedColumn +from cudf_polars.utils import dtypes if TYPE_CHECKING: from collections.abc import Mapping, Sequence, Set - import pyarrow as pa from typing_extensions import Self import cudf @@ -50,8 +52,16 @@ def to_polars(self) -> pl.DataFrame: self.table, [plc.interop.ColumnMetadata(name=c.name) for c in self.columns], ) - - return cast(pl.DataFrame, pl.from_arrow(table)) + return cast(pl.DataFrame, pl.from_arrow(table)).with_columns( + *( + pl.col(c.name).set_sorted( + descending=c.order == plc.types.Order.DESCENDING + ) + if c.is_sorted + else pl.col(c.name) + for c in self.columns + ) + ) @cached_property def column_names_set(self) -> frozenset[str]: @@ -83,6 +93,35 @@ def from_cudf(cls, df: cudf.DataFrame) -> Self: ] ) + @classmethod + def from_polars(cls, df: pl.DataFrame) -> Self: + """ + Create from a polars dataframe. + + Parameters + ---------- + df + Polars dataframe to convert + + Returns + ------- + New dataframe representing the input. + """ + table = df.to_arrow() + schema = table.schema + for i, field in enumerate(schema): + schema = schema.set( + i, pa.field(field.name, dtypes.downcast_arrow_lists(field.type)) + ) + # No-op if the schema is unchanged. + d_table = plc.interop.from_arrow(table.cast(schema)) + return cls( + [ + NamedColumn(column, h_col.name).copy_metadata(h_col) + for column, h_col in zip(d_table.columns(), df.iter_columns()) + ] + ) + @classmethod def from_table(cls, table: plc.Table, names: Sequence[str]) -> Self: """ diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index adf266bab81..f37cb3f475c 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -867,7 +867,7 @@ def __init__( self.name = name self.options = options self.children = children - if self.name not in ("round", "unique", "mask_nans"): + if self.name not in ("mask_nans", "round", "setsorted", "unique"): raise NotImplementedError(f"Unary function {name=}") def do_evaluate( @@ -926,6 +926,33 @@ def do_evaluate( if maintain_order: return Column(column).sorted_like(values) return Column(column) + elif self.name == "setsorted": + (column,) = ( + child.evaluate(df, context=context, mapping=mapping) + for child in self.children + ) + (asc,) = self.options + order = ( + plc.types.Order.ASCENDING + if asc == "ascending" + else plc.types.Order.DESCENDING + ) + null_order = plc.types.NullOrder.BEFORE + if column.obj.null_count() > 0 and (n := column.obj.size()) > 1: + # PERF: This invokes four stream synchronisations! + has_nulls_first = not plc.copying.get_element(column.obj, 0).is_valid() + has_nulls_last = not plc.copying.get_element( + column.obj, n - 1 + ).is_valid() + if (order == plc.types.Order.DESCENDING and has_nulls_first) or ( + order == plc.types.Order.ASCENDING and has_nulls_last + ): + null_order = plc.types.NullOrder.AFTER + return column.set_sorted( + is_sorted=plc.types.Sorted.YES, + order=order, + null_order=null_order, + ) raise NotImplementedError( f"Unimplemented unary function {self.name=}" ) # pragma: no cover; init trips first diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b32fa9c273e..5e6544ef77c 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -30,7 +30,7 @@ import cudf_polars.dsl.expr as expr from cudf_polars.containers import DataFrame, NamedColumn -from cudf_polars.utils import dtypes, sorting +from cudf_polars.utils import sorting if TYPE_CHECKING: from collections.abc import MutableMapping @@ -385,17 +385,7 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: pdf = pl.DataFrame._from_pydf(self.df) if self.projection is not None: pdf = pdf.select(self.projection) - table = pdf.to_arrow() - schema = table.schema - for i, field in enumerate(schema): - schema = schema.set( - i, pa.field(field.name, dtypes.downcast_arrow_lists(field.type)) - ) - # No-op if the schema is unchanged. - table = table.cast(schema) - df = DataFrame.from_table( - plc.interop.from_arrow(table), list(self.schema.keys()) - ) + df = DataFrame.from_polars(pdf) assert all( c.obj.type() == dtype for c, dtype in zip(df.columns, self.schema.values()) ) @@ -542,16 +532,17 @@ def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: keys = broadcast( *(k.evaluate(df) for k in self.keys), target_length=df.num_rows ) - # TODO: use sorted information, need to expose column_order - # and null_precedence in pylibcudf groupby constructor - # sorted = ( - # plc.types.Sorted.YES - # if all(k.is_sorted for k in keys) - # else plc.types.Sorted.NO - # ) + sorted = ( + plc.types.Sorted.YES + if all(k.is_sorted for k in keys) + else plc.types.Sorted.NO + ) grouper = plc.groupby.GroupBy( plc.Table([k.obj for k in keys]), null_handling=plc.types.NullPolicy.INCLUDE, + keys_are_sorted=sorted, + column_order=[k.order for k in keys], + null_precedence=[k.null_order for k in keys], ) # TODO: uniquify requests = [] diff --git a/python/cudf_polars/tests/containers/test_dataframe.py b/python/cudf_polars/tests/containers/test_dataframe.py index 2e385e39eef..87508e17407 100644 --- a/python/cudf_polars/tests/containers/test_dataframe.py +++ b/python/cudf_polars/tests/containers/test_dataframe.py @@ -5,6 +5,8 @@ import pytest +import polars as pl + import cudf._lib.pylibcudf as plc from cudf_polars.containers import DataFrame, NamedColumn @@ -90,3 +92,52 @@ def test_shallow_copy(): ) assert df.columns[0].is_sorted == plc.types.Sorted.YES assert copy.columns[0].is_sorted == plc.types.Sorted.NO + + +def test_sorted_flags_preserved_empty(): + df = pl.DataFrame({"a": pl.Series([], dtype=pl.Int8())}) + df.select(pl.col("a").sort()) + + gf = DataFrame.from_polars(df) + + (a,) = gf.columns + + assert a.is_sorted == plc.types.Sorted.YES + + assert df.flags == gf.to_polars().flags + + +@pytest.mark.parametrize("nulls_last", [True, False]) +def test_sorted_flags_preserved(with_nulls, nulls_last): + values = [1, 2, -1, 2, 4, 5] + if with_nulls: + values[4] = None + df = pl.DataFrame({"a": values, "b": values, "c": values}) + + df = df.select( + pl.col("a").sort(descending=False, nulls_last=nulls_last), + pl.col("b").sort(descending=True, nulls_last=nulls_last), + pl.col("c"), + ) + + gf = DataFrame.from_polars(df) + + a_null_order = ( + plc.types.NullOrder.AFTER + if nulls_last and with_nulls + else plc.types.NullOrder.BEFORE + ) + b_null_order = ( + plc.types.NullOrder.AFTER + if not nulls_last and with_nulls + else plc.types.NullOrder.BEFORE + ) + a, b, c = gf.columns + assert a.is_sorted == plc.types.Sorted.YES + assert a.order == plc.types.Order.ASCENDING + assert a.null_order == a_null_order + assert b.is_sorted == plc.types.Sorted.YES + assert b.order == plc.types.Order.DESCENDING + assert b.null_order == b_null_order + assert c.is_sorted == plc.types.Sorted.NO + assert df.flags == gf.to_polars().flags diff --git a/python/cudf_polars/tests/expressions/test_agg.py b/python/cudf_polars/tests/expressions/test_agg.py index e53fd7f8615..245bde3acab 100644 --- a/python/cudf_polars/tests/expressions/test_agg.py +++ b/python/cudf_polars/tests/expressions/test_agg.py @@ -20,13 +20,7 @@ def dtype(request): return request.param -@pytest.fixture( - params=[ - False, - pytest.param(True, marks=pytest.mark.xfail(reason="No handler for set_sorted")), - ], - ids=["unsorted", "sorted"], -) +@pytest.fixture(params=[False, True], ids=["unsorted", "sorted"]) def is_sorted(request): return request.param diff --git a/python/cudf_polars/tests/expressions/test_sort.py b/python/cudf_polars/tests/expressions/test_sort.py index 0195266f5c6..d46df92db94 100644 --- a/python/cudf_polars/tests/expressions/test_sort.py +++ b/python/cudf_polars/tests/expressions/test_sort.py @@ -8,6 +8,9 @@ import polars as pl +import cudf._lib.pylibcudf as plc + +from cudf_polars import translate_ir from cudf_polars.testing.asserts import assert_gpu_result_equal @@ -51,3 +54,31 @@ def test_sort_by_expression(descending, nulls_last, maintain_order): ) ) assert_gpu_result_equal(query, check_row_order=maintain_order) + + +@pytest.mark.parametrize("descending", [False, True]) +@pytest.mark.parametrize("nulls_last", [False, True]) +def test_setsorted(descending, nulls_last, with_nulls): + values = sorted([1, 2, 3, 4, 5, 6, -2], reverse=descending) + if with_nulls: + values[-1 if nulls_last else 0] = None + df = pl.LazyFrame({"a": values}) + + q = df.set_sorted("a", descending=descending) + + assert_gpu_result_equal(q) + + df = translate_ir(q._ldf.visit()).evaluate(cache={}) + + (a,) = df.columns + + assert a.is_sorted == plc.types.Sorted.YES + null_order = ( + plc.types.NullOrder.AFTER + if (descending ^ nulls_last) and with_nulls + else plc.types.NullOrder.BEFORE + ) + assert a.null_order == null_order + assert a.order == ( + plc.types.Order.DESCENDING if descending else plc.types.Order.ASCENDING + ) diff --git a/python/cudf_polars/tests/test_groupby.py b/python/cudf_polars/tests/test_groupby.py index 81306397b9f..50adca01950 100644 --- a/python/cudf_polars/tests/test_groupby.py +++ b/python/cudf_polars/tests/test_groupby.py @@ -2,6 +2,8 @@ # SPDX-License-Identifier: Apache-2.0 from __future__ import annotations +import itertools + import pytest import polars as pl @@ -26,12 +28,12 @@ def df(): @pytest.fixture( params=[ - ["key1"], - ["key2"], + [pl.col("key1")], + [pl.col("key2")], [pl.col("key1") * pl.col("key2")], - ["key1", "key2"], + [pl.col("key1"), pl.col("key2")], [pl.col("key1") == pl.col("key2")], - ["key2", pl.col("key1") == pl.lit(1, dtype=pl.Int64)], + [pl.col("key2"), pl.col("key1") == pl.lit(1, dtype=pl.Int64)], ], ids=lambda keys: "-".join(map(str, keys)), ) @@ -82,6 +84,35 @@ def test_groupby(df: pl.LazyFrame, maintain_order, keys, exprs): assert_gpu_result_equal(q, check_exact=False) +def test_groupby_sorted_keys(df: pl.LazyFrame, keys, exprs): + sorted_keys = [ + key.sort(descending=descending) + for key, descending in zip(keys, itertools.cycle([False, True])) + ] + + q = df.group_by(*sorted_keys).agg(*exprs) + + schema = q.collect_schema() + sort_keys = list(schema.keys())[: len(keys)] + # Multiple keys don't do sorting + qsorted = q.sort(*sort_keys) + if len(keys) > 1: + with pytest.raises(AssertionError): + # https://github.com/pola-rs/polars/issues/17556 + assert_gpu_result_equal(q, check_exact=False) + if schema[sort_keys[1]] == pl.Boolean(): + # https://github.com/pola-rs/polars/issues/17557 + with pytest.raises(AssertionError): + assert_gpu_result_equal(qsorted, check_exact=False) + else: + assert_gpu_result_equal(qsorted, check_exact=False) + elif schema[sort_keys[0]] == pl.Boolean(): + # Boolean keys don't do sorting, so we get random order + assert_gpu_result_equal(qsorted, check_exact=False) + else: + assert_gpu_result_equal(q, check_exact=False) + + def test_groupby_len(df, keys): q = df.group_by(*keys).agg(pl.len())