Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle nans in groupby-aggregations in polars executor #16233

Merged
merged 9 commits into from
Jul 12, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This page provides API documentation for pylibcudf.
stream_compaction
table
traits
transform
types
unary

Expand Down
6 changes: 6 additions & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/transform.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
=========
transform
=========

.. automodule:: cudf._lib.pylibcudf.transform
:members:
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(cython_sources
sorting.pyx
table.pyx
traits.pyx
transform.pyx
types.pyx
unary.pyx
utils.pyx
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/__init__.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from . cimport (
stream_compaction,
strings,
traits,
transform,
types,
unary,
)
Expand Down Expand Up @@ -63,6 +64,7 @@ __all__ = [
"strings",
"sorting",
"traits",
"transform",
"types",
"unary",
]
2 changes: 2 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
stream_compaction,
strings,
traits,
transform,
types,
unary,
)
Expand Down Expand Up @@ -64,6 +65,7 @@
"strings",
"sorting",
"traits",
"transform",
"types",
"unary",
]
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/column.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ cdef class Column:
cpdef gpumemoryview null_mask(self)
cpdef list children(self)
cpdef Column copy(self)
cpdef Column with_mask(self, gpumemoryview, size_type)

cpdef ListColumnView list_view(self)

Expand Down
32 changes: 29 additions & 3 deletions python/cudf/cudf/_lib/pylibcudf/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,32 @@ cdef class Column:
children,
)

cpdef Column with_mask(self, gpumemoryview mask, size_type null_count):
"""Augment this column with a new null mask.

Parameters
----------
mask
New mask (or None to unset the mask)
null_count
New null count. If this is incorrect, bad things happen.

Returns
-------
New Column object sharing data with self (except for the mask which is new).
"""
if mask is None and null_count > 0:
raise ValueError("Empty mask must have null count of zero")
return Column(
self._data_type,
self._size,
self._data,
mask,
null_count,
self._offset,
self._children,
)

@staticmethod
cdef Column from_column_view(const column_view& cv, Column owner):
"""Create a Column from a libcudf column_view.
Expand Down Expand Up @@ -250,7 +276,7 @@ cdef class Column:
column is in use.
"""
data = gpumemoryview(obj)
iface = data.__cuda_array_interface__()
iface = data.__cuda_array_interface__
if iface.get('mask') is not None:
raise ValueError("mask not yet supported.")

Expand Down Expand Up @@ -400,8 +426,8 @@ def is_c_contiguous(
itemsize : int
Size of an element in bytes.

Return
------
Returns
-------
bool
The boolean answer.
"""
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/gpumemoryview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ cdef class gpumemoryview:
# TODO: Need to respect readonly
self.ptr = cai["data"][0]

@property
def __cuda_array_interface__(self):
return self.obj.__cuda_array_interface__
7 changes: 7 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/transform.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from .column cimport Column
from .gpumemoryview cimport gpumemoryview


cpdef tuple[gpumemoryview, int] nans_to_nulls(Column input)
35 changes: 35 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/transform.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move, pair

from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer

from cudf._lib.pylibcudf.libcudf cimport transform as cpp_transform
from cudf._lib.pylibcudf.libcudf.types cimport size_type

from .column cimport Column
from .gpumemoryview cimport gpumemoryview


cpdef tuple[gpumemoryview, int] nans_to_nulls(Column input):
"""Create a null mask preserving existing nulls and converting nans to null.

Parameters
----------
input
wence- marked this conversation as resolved.
Show resolved Hide resolved
Column to produce new mask from.

Returns
-------
Two-tuple of a gpumemoryview wrapping the null mask and the new null count.
"""
cdef pair[unique_ptr[device_buffer], size_type] c_result

with nogil:
c_result = move(cpp_transform.nans_to_nulls(input.view()))

return (
gpumemoryview(DeviceBuffer.c_from_unique_ptr(move(c_result.first))),
c_result.second
)
17 changes: 5 additions & 12 deletions python/cudf/cudf/_lib/transform.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer
cimport cudf._lib.pylibcudf.libcudf.transform as libcudf_transform
from cudf._lib.column cimport Column
from cudf._lib.expressions cimport Expression
from cudf._lib.pylibcudf cimport transform as plc_transform
from cudf._lib.pylibcudf.libcudf.column.column cimport column
from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view
from cudf._lib.pylibcudf.libcudf.expressions cimport expression
Expand Down Expand Up @@ -82,18 +83,10 @@ def mask_to_bools(object mask_buffer, size_type begin_bit, size_type end_bit):

@acquire_spill_lock()
def nans_to_nulls(Column input):
cdef column_view c_input = input.view()
cdef pair[unique_ptr[device_buffer], size_type] c_output
cdef unique_ptr[device_buffer] c_buffer

with nogil:
c_output = move(libcudf_transform.nans_to_nulls(c_input))
c_buffer = move(c_output.first)

if c_output.second == 0:
return None

return as_buffer(DeviceBuffer.c_from_unique_ptr(move(c_buffer)))
(mask, _) = plc_transform.nans_to_nulls(
input.to_pylibcudf(mode="read")
)
return as_buffer(mask)


@acquire_spill_lock()
Expand Down
45 changes: 32 additions & 13 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,29 @@ def copy(self) -> Self:
)

def mask_nans(self) -> Self:
"""Return a copy of self with nans masked out."""
if self.nan_count > 0:
raise NotImplementedError("Need to port transform.hpp to pylibcudf")
"""Return a shallow copy of self with nans masked out."""
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.obj.null_count()
mask, new_count = plc.transform.nans_to_nulls(self.obj)
result = type(self)(self.obj.with_mask(mask, new_count))
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()

@functools.cached_property
def nan_count(self) -> int:
"""Return the number of NaN values in the column."""
if self.obj.type().id() not in (plc.TypeId.FLOAT32, plc.TypeId.FLOAT64):
return 0
return plc.interop.to_arrow(
plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
# TODO: pylibcudf needs to have a SizeType DataType singleton
plc.DataType(plc.TypeId.INT32),
)
).as_py()
if plc.traits.is_floating_point(self.obj.type()):
return plc.interop.to_arrow(
plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
# TODO: pylibcudf needs to have a SizeType DataType singleton
plc.DataType(plc.TypeId.INT32),
)
).as_py()
return 0


class NamedColumn(Column):
Expand Down Expand Up @@ -187,3 +192,17 @@ def copy(self, *, new_name: str | None = None) -> Self:
order=self.order,
null_order=self.null_order,
)

def mask_nans(self) -> Self:
"""Return a shallow copy of self with nans masked out."""
# Annoying, the inheritance is not right (can't call the
# super-type mask_nans), but will sort that by refactoring
# later.
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.obj.null_count()
mask, new_count = plc.transform.nans_to_nulls(self.obj)
result = type(self)(self.obj.with_mask(mask, new_count), self.name)
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()
12 changes: 11 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ def __init__(
self.name = name
self.options = options
self.children = children
if self.name not in ("round", "unique"):
if self.name not in ("round", "unique", "mask_nans"):
raise NotImplementedError(f"Unary function {name=}")

def do_evaluate(
Expand All @@ -878,6 +878,9 @@ def do_evaluate(
mapping: Mapping[Expr, Column] | None = None,
) -> Column:
"""Evaluate this expression given a dataframe for context."""
if self.name == "mask_nans":
(child,) = self.children
return child.evaluate(df, context=context, mapping=mapping).mask_nans()
if self.name == "round":
(decimal_places,) = self.options
(values,) = (
Expand Down Expand Up @@ -1215,12 +1218,19 @@ def collect_agg(self, *, depth: int) -> AggInfo:
raise NotImplementedError(
"Nested aggregations in groupby"
) # pragma: no cover; check_agg trips first
if (isminmax := self.name in {"min", "max"}) and self.options:
raise NotImplementedError("Nan propagation in groupby for min/max")
(child,) = self.children
((expr, _, _),) = child.collect_agg(depth=depth + 1).requests
if self.request is None:
raise NotImplementedError(
f"Aggregation {self.name} in groupby"
) # pragma: no cover; __init__ trips first
if isminmax and plc.traits.is_floating_point(self.dtype):
assert expr is not None
# Ignore nans in these groupby aggs, do this by masking
# nans in the input
expr = UnaryFunction(self.dtype, "mask_nans", (), expr)
return AggInfo([(expr, self.request, self)])

def _reduce(
Expand Down
20 changes: 13 additions & 7 deletions python/cudf_polars/tests/containers/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

from __future__ import annotations

from functools import partial

import pyarrow
import pytest

import cudf._lib.pylibcudf as plc

from cudf_polars.containers import Column
from cudf_polars.containers import Column, NamedColumn


def test_non_scalar_access_raises():
Expand Down Expand Up @@ -54,17 +56,21 @@ def test_shallow_copy():


@pytest.mark.parametrize("typeid", [plc.TypeId.INT8, plc.TypeId.FLOAT32])
def test_mask_nans(typeid):
@pytest.mark.parametrize("constructor", [Column, partial(NamedColumn, name="name")])
def test_mask_nans(typeid, constructor):
dtype = plc.DataType(typeid)
values = pyarrow.array([0, 0, 0], type=plc.interop.to_arrow(dtype))
column = Column(plc.interop.from_arrow(values))
column = constructor(plc.interop.from_arrow(values))
masked = column.mask_nans()
assert column.obj is masked.obj
assert column.obj.null_count() == masked.obj.null_count()


def test_mask_nans_float_with_nan_notimplemented():
def test_mask_nans_float():
dtype = plc.DataType(plc.TypeId.FLOAT32)
values = pyarrow.array([0, 0, float("nan")], type=plc.interop.to_arrow(dtype))
column = Column(plc.interop.from_arrow(values))
with pytest.raises(NotImplementedError):
_ = column.mask_nans()
masked = column.mask_nans()
expect = pyarrow.array([0, 0, None], type=plc.interop.to_arrow(dtype))
got = pyarrow.array(plc.interop.to_arrow(masked.obj))

assert expect == got
25 changes: 18 additions & 7 deletions python/cudf_polars/tests/expressions/test_agg.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,14 +59,25 @@ def test_agg(df, agg):


@pytest.mark.parametrize(
"propagate_nans",
[pytest.param(False, marks=pytest.mark.xfail(reason="Need to mask nans")), True],
ids=["mask_nans", "propagate_nans"],
"op", [pl.Expr.min, pl.Expr.nan_min, pl.Expr.max, pl.Expr.nan_max]
)
@pytest.mark.parametrize("op", ["min", "max"])
def test_agg_float_with_nans(propagate_nans, op):
df = pl.LazyFrame({"a": pl.Series([1, 2, float("nan")], dtype=pl.Float64())})
op = getattr(pl.Expr, f"nan_{op}" if propagate_nans else op)
def test_agg_float_with_nans(op):
df = pl.LazyFrame(
{
"a": pl.Series([1, 2, float("nan")], dtype=pl.Float64()),
"b": pl.Series([1, 2, None], dtype=pl.Int8()),
}
)
q = df.select(op(pl.col("a")), op(pl.col("b")))

assert_gpu_result_equal(q)


@pytest.mark.xfail(reason="https://github.com/pola-rs/polars/issues/17513")
@pytest.mark.parametrize("op", [pl.Expr.max, pl.Expr.min])
def test_agg_singleton(op):
df = pl.LazyFrame({"a": pl.Series([float("nan")])})

q = df.select(op(pl.col("a")))

assert_gpu_result_equal(q)
Loading
Loading