diff --git a/python/cudf_polars/cudf_polars/__init__.py b/python/cudf_polars/cudf_polars/__init__.py index b19a282129a..41d06f8631b 100644 --- a/python/cudf_polars/cudf_polars/__init__.py +++ b/python/cudf_polars/cudf_polars/__init__.py @@ -10,7 +10,13 @@ from __future__ import annotations +from cudf_polars._version import __git_commit__, __version__ from cudf_polars.callback import execute_with_cudf from cudf_polars.dsl.translate import translate_ir -__all__: list[str] = ["execute_with_cudf", "translate_ir"] +__all__: list[str] = [ + "execute_with_cudf", + "translate_ir", + "__git_commit__", + "__version__", +] diff --git a/python/cudf_polars/cudf_polars/containers/__init__.py b/python/cudf_polars/cudf_polars/containers/__init__.py index ee69e748eb5..06bb08953f1 100644 --- a/python/cudf_polars/cudf_polars/containers/__init__.py +++ b/python/cudf_polars/cudf_polars/containers/__init__.py @@ -5,8 +5,7 @@ from __future__ import annotations -__all__: list[str] = ["DataFrame", "Column", "NamedColumn", "Scalar"] +__all__: list[str] = ["DataFrame", "Column", "NamedColumn"] from cudf_polars.containers.column import Column, NamedColumn from cudf_polars.containers.dataframe import DataFrame -from cudf_polars.containers.scalar import Scalar diff --git a/python/cudf_polars/cudf_polars/containers/column.py b/python/cudf_polars/cudf_polars/containers/column.py index 575d15d3ece..156dd395d64 100644 --- a/python/cudf_polars/cudf_polars/containers/column.py +++ b/python/cudf_polars/cudf_polars/containers/column.py @@ -17,12 +17,13 @@ class Column: - """A column with sortedness metadata.""" + """An immutable column with sortedness metadata.""" obj: plc.Column is_sorted: plc.types.Sorted order: plc.types.Order null_order: plc.types.NullOrder + is_scalar: bool def __init__( self, @@ -33,10 +34,33 @@ def __init__( null_order: plc.types.NullOrder = plc.types.NullOrder.BEFORE, ): self.obj = column + self.is_scalar = self.obj.size() == 1 + if self.obj.size() <= 1: + is_sorted = plc.types.Sorted.YES self.is_sorted = is_sorted self.order = order self.null_order = null_order + @functools.cached_property + def obj_scalar(self) -> plc.Scalar: + """ + A copy of the column object as a pylibcudf Scalar. + + Returns + ------- + pylibcudf Scalar object. + + Raises + ------ + ValueError + If the column is not length-1. + """ + if not self.is_scalar: + raise ValueError( + f"Cannot convert a column of length {self.obj.size()} to scalar" + ) + return plc.copying.get_element(self.obj, 0) + def sorted_like(self, like: Column, /) -> Self: """ Copy sortedness properties from a column onto self. @@ -81,6 +105,8 @@ def set_sorted( ------- Self with metadata set. """ + if self.obj.size() <= 1: + is_sorted = plc.types.Sorted.YES self.is_sorted = is_sorted self.order = order self.null_order = null_order diff --git a/python/cudf_polars/cudf_polars/containers/dataframe.py b/python/cudf_polars/cudf_polars/containers/dataframe.py index ac7e748095e..7039fcaf077 100644 --- a/python/cudf_polars/cudf_polars/containers/dataframe.py +++ b/python/cudf_polars/cudf_polars/containers/dataframe.py @@ -32,7 +32,7 @@ class DataFrame: """A representation of a dataframe.""" columns: list[NamedColumn] - table: plc.Table | None + table: plc.Table def __init__(self, columns: Sequence[NamedColumn]) -> None: self.columns = list(columns) @@ -41,7 +41,7 @@ def __init__(self, columns: Sequence[NamedColumn]) -> None: def copy(self) -> Self: """Return a shallow copy of self.""" - return type(self)(self.columns) + return type(self)([c.copy() for c in self.columns]) def to_polars(self) -> pl.DataFrame: """Convert to a polars DataFrame.""" @@ -70,8 +70,6 @@ def num_columns(self) -> int: @cached_property def num_rows(self) -> int: """Number of rows.""" - if self.table is None: - raise ValueError("Number of rows of frame with scalars makes no sense") return self.table.num_rows() @classmethod diff --git a/python/cudf_polars/cudf_polars/containers/scalar.py b/python/cudf_polars/cudf_polars/containers/scalar.py deleted file mode 100644 index fc97d0fd9c2..00000000000 --- a/python/cudf_polars/cudf_polars/containers/scalar.py +++ /dev/null @@ -1,23 +0,0 @@ -# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. -# SPDX-License-Identifier: Apache-2.0 - -"""A scalar, with some properties.""" - -from __future__ import annotations - -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - import cudf._lib.pylibcudf as plc - -__all__: list[str] = ["Scalar"] - - -class Scalar: - """A scalar, and a name.""" - - __slots__ = ("obj", "name") - obj: plc.Scalar - - def __init__(self, scalar: plc.Scalar): - self.obj = scalar diff --git a/python/cudf_polars/cudf_polars/dsl/expr.py b/python/cudf_polars/cudf_polars/dsl/expr.py index 6d9435ce373..a81cdcbf0c3 100644 --- a/python/cudf_polars/cudf_polars/dsl/expr.py +++ b/python/cudf_polars/cudf_polars/dsl/expr.py @@ -5,7 +5,7 @@ """ DSL nodes for the polars expression language. -An expression node is a function, `DataFrame -> Column` or `DataFrame -> Scalar`. +An expression node is a function, `DataFrame -> Column`. The evaluation context is provided by a LogicalPlan node, and can affect the evaluation rule as well as providing the dataframe input. @@ -26,7 +26,7 @@ import cudf._lib.pylibcudf as plc -from cudf_polars.containers import Column, NamedColumn, Scalar +from cudf_polars.containers import Column, NamedColumn from cudf_polars.utils import sorting if TYPE_CHECKING: @@ -165,7 +165,7 @@ def do_evaluate( *, context: ExecutionContext = ExecutionContext.FRAME, mapping: Mapping[Expr, Column] | None = None, - ) -> Column: # TODO: return type is a lie for Literal + ) -> Column: """ Evaluate this expression given a dataframe for context. @@ -187,8 +187,7 @@ def do_evaluate( Returns ------- - Column representing the evaluation of the expression (or maybe - a scalar). + Column representing the evaluation of the expression. Raises ------ @@ -205,7 +204,7 @@ def evaluate( *, context: ExecutionContext = ExecutionContext.FRAME, mapping: Mapping[Expr, Column] | None = None, - ) -> Column: # TODO: return type is a lie for Literal + ) -> Column: """ Evaluate this expression given a dataframe for context. @@ -222,23 +221,13 @@ def evaluate( Notes ----- - Individual subclasses should implement :meth:`do_allocate`, + Individual subclasses should implement :meth:`do_evaluate`, this method provides logic to handle lookups in the substitution mapping. - The typed return value of :class:`Column` is not true when - evaluating :class:`Literal` nodes (which instead produce - :class:`Scalar` objects). However, these duck-type to having a - pylibcudf container object inside them, and usually they end - up appearing in binary expressions which pylibcudf handles - appropriately since there are overloads for (column, scalar) - pairs. We don't have to handle (scalar, scalar) in binops - since the polars optimizer has a constant-folding pass. - Returns ------- - Column representing the evaluation of the expression (or maybe - a scalar). + Column representing the evaluation of the expression. Raises ------ @@ -319,24 +308,35 @@ def evaluate( context: ExecutionContext = ExecutionContext.FRAME, mapping: Mapping[Expr, Column] | None = None, ) -> NamedColumn: - """Evaluate this expression given a dataframe for context.""" + """ + Evaluate this expression given a dataframe for context. + + Parameters + ---------- + df + DataFrame providing context + context + Execution context + mapping + Substitution mapping + + Returns + ------- + NamedColumn attaching a name to an evaluated Column + + See Also + -------- + :meth:`Expr.evaluate` for details, this function just adds the + name to a column produced from an expression. + """ obj = self.value.evaluate(df, context=context, mapping=mapping) - if isinstance(obj, Scalar): - return NamedColumn( - plc.Column.from_scalar(obj.obj, 1), - self.name, - is_sorted=plc.types.Sorted.YES, - order=plc.types.Order.ASCENDING, - null_order=plc.types.NullOrder.BEFORE, - ) - else: - return NamedColumn( - obj.obj, - self.name, - is_sorted=obj.is_sorted, - order=obj.order, - null_order=obj.null_order, - ) + return NamedColumn( + obj.obj, + self.name, + is_sorted=obj.is_sorted, + order=obj.order, + null_order=obj.null_order, + ) def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" @@ -363,7 +363,7 @@ def do_evaluate( ) -> Column: """Evaluate this expression given a dataframe for context.""" # datatype of pyarrow scalar is correct by construction. - return Scalar(plc.interop.from_arrow(self.value)) # type: ignore + return Column(plc.Column.from_scalar(plc.interop.from_arrow(self.value), 1)) class Col(Expr): @@ -402,8 +402,14 @@ def do_evaluate( mapping: Mapping[Expr, Column] | None = None, ) -> Column: """Evaluate this expression given a dataframe for context.""" - # TODO: type is wrong, and dtype - return df.num_rows # type: ignore + return Column( + plc.Column.from_scalar( + plc.interop.from_arrow( + pa.scalar(df.num_rows, type=plc.interop.to_arrow(self.dtype)) + ), + 1, + ) + ) def collect_agg(self, *, depth: int) -> AggInfo: """Collect information about aggregations in groupbys.""" @@ -664,10 +670,24 @@ def do_evaluate( return Column(plc.strings.case.to_upper(column.obj)) elif self.name == pl_expr.StringFunction.EndsWith: column, suffix = columns - return Column(plc.strings.find.ends_with(column.obj, suffix.obj)) + return Column( + plc.strings.find.ends_with( + column.obj, + suffix.obj_scalar + if column.obj.size() != suffix.obj.size() and suffix.is_scalar + else suffix.obj, + ) + ) elif self.name == pl_expr.StringFunction.StartsWith: - column, suffix = columns - return Column(plc.strings.find.starts_with(column.obj, suffix.obj)) + column, prefix = columns + return Column( + plc.strings.find.starts_with( + column.obj, + prefix.obj_scalar + if column.obj.size() != prefix.obj.size() and prefix.is_scalar + else prefix.obj, + ) + ) else: raise NotImplementedError(f"StringFunction {self.name}") @@ -875,9 +895,6 @@ def __init__( self, dtype: plc.DataType, name: str, options: Any, value: Expr ) -> None: super().__init__(dtype) - # TODO: fix polars name - if name == "nunique": - name = "n_unique" self.name = name self.options = options self.children = (value,) @@ -1092,8 +1109,15 @@ def do_evaluate( child.evaluate(df, context=context, mapping=mapping) for child in self.children ) + lop = left.obj + rop = right.obj + if left.obj.size() != right.obj.size(): + if left.is_scalar: + lop = left.obj_scalar + elif right.is_scalar: + rop = right.obj_scalar return Column( - plc.binaryop.binary_operation(left.obj, right.obj, self.op, self.dtype), + plc.binaryop.binary_operation(lop, rop, self.op, self.dtype), ) def collect_agg(self, *, depth: int) -> AggInfo: diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index 665bbe5be41..0a6deb5698c 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -63,26 +63,58 @@ def broadcast( *columns: NamedColumn, target_length: int | None = None ) -> list[NamedColumn]: - lengths = {column.obj.size() for column in columns} - if len(lengths - {1}) > 1: - raise RuntimeError("Mismatching column lengths") + """ + Broadcast a sequence of columns to a common length. + + Parameters + ---------- + columns + Columns to broadcast. + target_length + Optional length to broadcast to. If not provided, uses the + non-unit length of existing columns. + + Returns + ------- + List of broadcasted columns all of the same length. + + Raises + ------ + RuntimeError + If broadcasting is not possible. + + Notes + ----- + In evaluation of a set of expressions, polars type-puns length-1 + columns with scalars. When we insert these into a DataFrame + object, we need to ensure they are of equal length. This function + takes some columns, some of which may be length-1 and ensures that + all length-1 columns are broadcast to the length of the others. + + Broadcasting is only possible if the set of lengths of the input + columns is a subset of ``{1, n}`` for some (fixed) ``n``. If + ``target_length`` is provided and not all columns are length-1 + (i.e. ``n != 1``), then ``target_length`` must be equal to ``n``. + """ + lengths: set[int] = {column.obj.size() for column in columns} if lengths == {1}: if target_length is None: return list(columns) nrows = target_length - elif len(lengths) == 1: - if target_length is not None: - assert target_length in lengths - return list(columns) else: - (nrows,) = lengths - {1} - if target_length is not None: - assert target_length == nrows + try: + (nrows,) = lengths.difference([1]) + except ValueError as e: + raise RuntimeError("Mismatching column lengths") from e + if target_length is not None and nrows != target_length: + raise RuntimeError( + f"Cannot broadcast columns of length {nrows=} to {target_length=}" + ) return [ column if column.obj.size() != 1 else NamedColumn( - plc.Column.from_scalar(plc.copying.get_element(column.obj, 0), nrows), + plc.Column.from_scalar(column.obj_scalar, nrows), column.name, is_sorted=plc.types.Sorted.YES, order=plc.types.Order.ASCENDING, @@ -279,12 +311,16 @@ class Select(IR): """Input dataframe.""" expr: list[expr.NamedExpr] """List of expressions to evaluate to form the new dataframe.""" + should_broadcast: bool + """Should columns be broadcast?""" def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) # Handle any broadcasting - columns = broadcast(*(e.evaluate(df) for e in self.expr)) + columns = [e.evaluate(df) for e in self.expr] + if self.should_broadcast: + columns = broadcast(*columns) return DataFrame(columns) @@ -587,15 +623,24 @@ class HStack(IR): """Input dataframe.""" columns: list[expr.NamedExpr] """List of expressions to produce new columns.""" + should_broadcast: bool + """Should columns be broadcast?""" def evaluate(self, *, cache: MutableMapping[int, DataFrame]) -> DataFrame: """Evaluate and return a dataframe.""" df = self.df.evaluate(cache=cache) columns = [c.evaluate(df) for c in self.columns] - # TODO: a bit of a hack, should inherit the should_broadcast - # property of polars' ProjectionOptions on the hstack node. - if not any(e.name.startswith("__POLARS_CSER_0x") for e in self.columns): + if self.should_broadcast: columns = broadcast(*columns, target_length=df.num_rows) + else: + # Polars ensures this is true, but let's make sure nothing + # went wrong. In this case, the parent node is a + # guaranteed to be a Select which will take care of making + # sure that everything is the same length. The result + # table that might have mismatching column lengths will + # never be turned into a pylibcudf Table with all columns + # by the Select, which is why this is safe. + assert all(e.name.startswith("__POLARS_CSER_0x") for e in self.columns) return df.with_columns(columns) diff --git a/python/cudf_polars/cudf_polars/dsl/translate.py b/python/cudf_polars/cudf_polars/dsl/translate.py index 38107023365..adde3b1a9dc 100644 --- a/python/cudf_polars/cudf_polars/dsl/translate.py +++ b/python/cudf_polars/cudf_polars/dsl/translate.py @@ -122,7 +122,7 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) exprs = [translate_named_expr(visitor, n=e) for e in node.expr] - return ir.Select(schema, inp, exprs) + return ir.Select(schema, inp, exprs, node.should_broadcast) @_translate_ir.register @@ -166,7 +166,7 @@ def _( with set_node(visitor, node.input): inp = translate_ir(visitor, n=None) exprs = [translate_named_expr(visitor, n=e) for e in node.exprs] - return ir.HStack(schema, inp, exprs) + return ir.HStack(schema, inp, exprs, node.should_broadcast) @_translate_ir.register diff --git a/python/cudf_polars/cudf_polars/utils/sorting.py b/python/cudf_polars/cudf_polars/utils/sorting.py index d35459db20d..24fd449dd88 100644 --- a/python/cudf_polars/cudf_polars/utils/sorting.py +++ b/python/cudf_polars/cudf_polars/utils/sorting.py @@ -30,7 +30,7 @@ def sort_order( Returns ------- - tuple of column_order and null_precendence + tuple of column_order and null_precedence suitable for passing to sort routines """ # Mimicking polars broadcast handling of descending diff --git a/python/cudf_polars/pyproject.toml b/python/cudf_polars/pyproject.toml index 2faf8c3193f..11178a3be74 100644 --- a/python/cudf_polars/pyproject.toml +++ b/python/cudf_polars/pyproject.toml @@ -49,9 +49,6 @@ license-files = ["LICENSE"] [tool.setuptools.dynamic] version = {file = "cudf_polars/VERSION"} -[tool.setuptools.packages.find] -exclude = ["*tests*"] - [tool.pytest.ini_options] xfail_strict = true diff --git a/python/cudf_polars/tests/utils/__init__.py b/python/cudf_polars/tests/utils/__init__.py new file mode 100644 index 00000000000..4611d642f14 --- /dev/null +++ b/python/cudf_polars/tests/utils/__init__.py @@ -0,0 +1,6 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +__all__: list[str] = [] diff --git a/python/cudf_polars/tests/utils/test_broadcast.py b/python/cudf_polars/tests/utils/test_broadcast.py new file mode 100644 index 00000000000..69ad1e519e2 --- /dev/null +++ b/python/cudf_polars/tests/utils/test_broadcast.py @@ -0,0 +1,74 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import cudf._lib.pylibcudf as plc + +from cudf_polars.containers import NamedColumn +from cudf_polars.dsl.ir import broadcast + + +@pytest.mark.parametrize("target", [4, None]) +def test_broadcast_all_scalar(target): + columns = [ + NamedColumn( + plc.column_factories.make_numeric_column( + plc.DataType(plc.TypeId.INT8), 1, plc.MaskState.ALL_VALID + ), + f"col{i}", + ) + for i in range(3) + ] + result = broadcast(*columns, target_length=target) + expected = 1 if target is None else target + + assert all(column.obj.size() == expected for column in result) + + +def test_invalid_target_length(): + columns = [ + NamedColumn( + plc.column_factories.make_numeric_column( + plc.DataType(plc.TypeId.INT8), 4, plc.MaskState.ALL_VALID + ), + f"col{i}", + ) + for i in range(3) + ] + with pytest.raises(RuntimeError): + _ = broadcast(*columns, target_length=8) + + +def test_broadcast_mismatching_column_lengths(): + columns = [ + NamedColumn( + plc.column_factories.make_numeric_column( + plc.DataType(plc.TypeId.INT8), i + 1, plc.MaskState.ALL_VALID + ), + f"col{i}", + ) + for i in range(3) + ] + with pytest.raises(RuntimeError): + _ = broadcast(*columns) + + +@pytest.mark.parametrize("nrows", [0, 5]) +def test_broadcast_with_scalars(nrows): + columns = [ + NamedColumn( + plc.column_factories.make_numeric_column( + plc.DataType(plc.TypeId.INT8), + nrows if i == 0 else 1, + plc.MaskState.ALL_VALID, + ), + f"col{i}", + ) + for i in range(3) + ] + + result = broadcast(*columns) + assert all(column.obj.size() == nrows for column in result)