Skip to content

Commit

Permalink
Handle nans in groupby-aggregations in polars executor (#16233)
Browse files Browse the repository at this point in the history
Polars `min` and `max` by default ignore nans (treating them as nulls), to mimic this behaviour we must mask out nans before performing a min/max aggregation.

Do this by exposing `nans_to_nulls` in pylibcudf and implementing a `with_mask` method on pylibcudf Columns.

Authors:
  - Lawrence Mitchell (https://github.com/wence-)

Approvers:
  - Thomas Li (https://github.com/lithomas1)

URL: #16233
  • Loading branch information
wence- authored Jul 12, 2024
1 parent 1ff7461 commit 4fc8e79
Show file tree
Hide file tree
Showing 18 changed files with 230 additions and 44 deletions.
1 change: 1 addition & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ This page provides API documentation for pylibcudf.
stream_compaction
table
traits
transform
types
unary

Expand Down
6 changes: 6 additions & 0 deletions docs/cudf/source/user_guide/api_docs/pylibcudf/transform.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
=========
transform
=========

.. automodule:: cudf._lib.pylibcudf.transform
:members:
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ set(cython_sources
sorting.pyx
table.pyx
traits.pyx
transform.pyx
types.pyx
unary.pyx
utils.pyx
Expand Down
2 changes: 2 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/__init__.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ from . cimport (
stream_compaction,
strings,
traits,
transform,
types,
unary,
)
Expand Down Expand Up @@ -63,6 +64,7 @@ __all__ = [
"strings",
"sorting",
"traits",
"transform",
"types",
"unary",
]
2 changes: 2 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
stream_compaction,
strings,
traits,
transform,
types,
unary,
)
Expand Down Expand Up @@ -64,6 +65,7 @@
"strings",
"sorting",
"traits",
"transform",
"types",
"unary",
]
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/column.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ cdef class Column:
cpdef gpumemoryview null_mask(self)
cpdef list children(self)
cpdef Column copy(self)
cpdef Column with_mask(self, gpumemoryview, size_type)

cpdef ListColumnView list_view(self)

Expand Down
32 changes: 29 additions & 3 deletions python/cudf/cudf/_lib/pylibcudf/column.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,32 @@ cdef class Column:
children,
)

cpdef Column with_mask(self, gpumemoryview mask, size_type null_count):
"""Augment this column with a new null mask.
Parameters
----------
mask : gpumemoryview
New mask (or None to unset the mask)
null_count : int
New null count. If this is incorrect, bad things happen.
Returns
-------
New Column object sharing data with self (except for the mask which is new).
"""
if mask is None and null_count > 0:
raise ValueError("Empty mask must have null count of zero")
return Column(
self._data_type,
self._size,
self._data,
mask,
null_count,
self._offset,
self._children,
)

@staticmethod
cdef Column from_column_view(const column_view& cv, Column owner):
"""Create a Column from a libcudf column_view.
Expand Down Expand Up @@ -250,7 +276,7 @@ cdef class Column:
column is in use.
"""
data = gpumemoryview(obj)
iface = data.__cuda_array_interface__()
iface = data.__cuda_array_interface__
if iface.get('mask') is not None:
raise ValueError("mask not yet supported.")

Expand Down Expand Up @@ -400,8 +426,8 @@ def is_c_contiguous(
itemsize : int
Size of an element in bytes.

Return
------
Returns
-------
bool
The boolean answer.
"""
Expand Down
1 change: 1 addition & 0 deletions python/cudf/cudf/_lib/pylibcudf/gpumemoryview.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ cdef class gpumemoryview:
# TODO: Need to respect readonly
self.ptr = cai["data"][0]

@property
def __cuda_array_interface__(self):
return self.obj.__cuda_array_interface__
7 changes: 7 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/transform.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from .column cimport Column
from .gpumemoryview cimport gpumemoryview


cpdef tuple[gpumemoryview, int] nans_to_nulls(Column input)
35 changes: 35 additions & 0 deletions python/cudf/cudf/_lib/pylibcudf/transform.pyx
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libcpp.memory cimport unique_ptr
from libcpp.utility cimport move, pair

from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer

from cudf._lib.pylibcudf.libcudf cimport transform as cpp_transform
from cudf._lib.pylibcudf.libcudf.types cimport size_type

from .column cimport Column
from .gpumemoryview cimport gpumemoryview


cpdef tuple[gpumemoryview, int] nans_to_nulls(Column input):
"""Create a null mask preserving existing nulls and converting nans to null.
Parameters
----------
input : Column
Column to produce new mask from.
Returns
-------
Two-tuple of a gpumemoryview wrapping the null mask and the new null count.
"""
cdef pair[unique_ptr[device_buffer], size_type] c_result

with nogil:
c_result = move(cpp_transform.nans_to_nulls(input.view()))

return (
gpumemoryview(DeviceBuffer.c_from_unique_ptr(move(c_result.first))),
c_result.second
)
17 changes: 5 additions & 12 deletions python/cudf/cudf/_lib/transform.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ from rmm._lib.device_buffer cimport DeviceBuffer, device_buffer
cimport cudf._lib.pylibcudf.libcudf.transform as libcudf_transform
from cudf._lib.column cimport Column
from cudf._lib.expressions cimport Expression
from cudf._lib.pylibcudf cimport transform as plc_transform
from cudf._lib.pylibcudf.libcudf.column.column cimport column
from cudf._lib.pylibcudf.libcudf.column.column_view cimport column_view
from cudf._lib.pylibcudf.libcudf.expressions cimport expression
Expand Down Expand Up @@ -82,18 +83,10 @@ def mask_to_bools(object mask_buffer, size_type begin_bit, size_type end_bit):

@acquire_spill_lock()
def nans_to_nulls(Column input):
cdef column_view c_input = input.view()
cdef pair[unique_ptr[device_buffer], size_type] c_output
cdef unique_ptr[device_buffer] c_buffer

with nogil:
c_output = move(libcudf_transform.nans_to_nulls(c_input))
c_buffer = move(c_output.first)

if c_output.second == 0:
return None

return as_buffer(DeviceBuffer.c_from_unique_ptr(move(c_buffer)))
(mask, _) = plc_transform.nans_to_nulls(
input.to_pylibcudf(mode="read")
)
return as_buffer(mask)


@acquire_spill_lock()
Expand Down
11 changes: 10 additions & 1 deletion python/cudf/cudf/pylibcudf_tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,15 @@ def sorted_opt(request):
return request.param


@pytest.fixture(scope="session", params=[False, True])
@pytest.fixture(
scope="session", params=[False, True], ids=["without_nulls", "with_nulls"]
)
def has_nulls(request):
return request.param


@pytest.fixture(
scope="session", params=[False, True], ids=["without_nans", "with_nans"]
)
def has_nans(request):
return request.param
32 changes: 32 additions & 0 deletions python/cudf/cudf/pylibcudf_tests/test_transform.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

import math

import pyarrow as pa
from utils import assert_column_eq

from cudf._lib import pylibcudf as plc


def test_nans_to_nulls(has_nans):
if has_nans:
values = [1, float("nan"), float("nan"), None, 3, None]
else:
values = [1, 4, 5, None, 3, None]

replaced = [
None if (v is None or (v is not None and math.isnan(v))) else v
for v in values
]

h_input = pa.array(values, type=pa.float32())
input = plc.interop.from_arrow(h_input)
assert input.null_count() == h_input.null_count
expect = pa.array(replaced, type=pa.float32())

mask, null_count = plc.transform.nans_to_nulls(input)

assert null_count == expect.null_count
got = input.with_mask(mask, null_count)

assert_column_eq(expect, got)
45 changes: 32 additions & 13 deletions python/cudf_polars/cudf_polars/containers/column.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,24 +128,29 @@ def copy(self) -> Self:
)

def mask_nans(self) -> Self:
"""Return a copy of self with nans masked out."""
if self.nan_count > 0:
raise NotImplementedError("Need to port transform.hpp to pylibcudf")
"""Return a shallow copy of self with nans masked out."""
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.obj.null_count()
mask, new_count = plc.transform.nans_to_nulls(self.obj)
result = type(self)(self.obj.with_mask(mask, new_count))
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()

@functools.cached_property
def nan_count(self) -> int:
"""Return the number of NaN values in the column."""
if self.obj.type().id() not in (plc.TypeId.FLOAT32, plc.TypeId.FLOAT64):
return 0
return plc.interop.to_arrow(
plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
# TODO: pylibcudf needs to have a SizeType DataType singleton
plc.DataType(plc.TypeId.INT32),
)
).as_py()
if plc.traits.is_floating_point(self.obj.type()):
return plc.interop.to_arrow(
plc.reduce.reduce(
plc.unary.is_nan(self.obj),
plc.aggregation.sum(),
# TODO: pylibcudf needs to have a SizeType DataType singleton
plc.DataType(plc.TypeId.INT32),
)
).as_py()
return 0


class NamedColumn(Column):
Expand Down Expand Up @@ -187,3 +192,17 @@ def copy(self, *, new_name: str | None = None) -> Self:
order=self.order,
null_order=self.null_order,
)

def mask_nans(self) -> Self:
"""Return a shallow copy of self with nans masked out."""
# Annoying, the inheritance is not right (can't call the
# super-type mask_nans), but will sort that by refactoring
# later.
if plc.traits.is_floating_point(self.obj.type()):
old_count = self.obj.null_count()
mask, new_count = plc.transform.nans_to_nulls(self.obj)
result = type(self)(self.obj.with_mask(mask, new_count), self.name)
if old_count == new_count:
return result.sorted_like(self)
return result
return self.copy()
12 changes: 11 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/expr.py
Original file line number Diff line number Diff line change
Expand Up @@ -867,7 +867,7 @@ def __init__(
self.name = name
self.options = options
self.children = children
if self.name not in ("round", "unique"):
if self.name not in ("round", "unique", "mask_nans"):
raise NotImplementedError(f"Unary function {name=}")

def do_evaluate(
Expand All @@ -878,6 +878,9 @@ def do_evaluate(
mapping: Mapping[Expr, Column] | None = None,
) -> Column:
"""Evaluate this expression given a dataframe for context."""
if self.name == "mask_nans":
(child,) = self.children
return child.evaluate(df, context=context, mapping=mapping).mask_nans()
if self.name == "round":
(decimal_places,) = self.options
(values,) = (
Expand Down Expand Up @@ -1215,12 +1218,19 @@ def collect_agg(self, *, depth: int) -> AggInfo:
raise NotImplementedError(
"Nested aggregations in groupby"
) # pragma: no cover; check_agg trips first
if (isminmax := self.name in {"min", "max"}) and self.options:
raise NotImplementedError("Nan propagation in groupby for min/max")
(child,) = self.children
((expr, _, _),) = child.collect_agg(depth=depth + 1).requests
if self.request is None:
raise NotImplementedError(
f"Aggregation {self.name} in groupby"
) # pragma: no cover; __init__ trips first
if isminmax and plc.traits.is_floating_point(self.dtype):
assert expr is not None
# Ignore nans in these groupby aggs, do this by masking
# nans in the input
expr = UnaryFunction(self.dtype, "mask_nans", (), expr)
return AggInfo([(expr, self.request, self)])

def _reduce(
Expand Down
20 changes: 13 additions & 7 deletions python/cudf_polars/tests/containers/test_column.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@

from __future__ import annotations

from functools import partial

import pyarrow
import pytest

import cudf._lib.pylibcudf as plc

from cudf_polars.containers import Column
from cudf_polars.containers import Column, NamedColumn


def test_non_scalar_access_raises():
Expand Down Expand Up @@ -54,17 +56,21 @@ def test_shallow_copy():


@pytest.mark.parametrize("typeid", [plc.TypeId.INT8, plc.TypeId.FLOAT32])
def test_mask_nans(typeid):
@pytest.mark.parametrize("constructor", [Column, partial(NamedColumn, name="name")])
def test_mask_nans(typeid, constructor):
dtype = plc.DataType(typeid)
values = pyarrow.array([0, 0, 0], type=plc.interop.to_arrow(dtype))
column = Column(plc.interop.from_arrow(values))
column = constructor(plc.interop.from_arrow(values))
masked = column.mask_nans()
assert column.obj is masked.obj
assert column.obj.null_count() == masked.obj.null_count()


def test_mask_nans_float_with_nan_notimplemented():
def test_mask_nans_float():
dtype = plc.DataType(plc.TypeId.FLOAT32)
values = pyarrow.array([0, 0, float("nan")], type=plc.interop.to_arrow(dtype))
column = Column(plc.interop.from_arrow(values))
with pytest.raises(NotImplementedError):
_ = column.mask_nans()
masked = column.mask_nans()
expect = pyarrow.array([0, 0, None], type=plc.interop.to_arrow(dtype))
got = pyarrow.array(plc.interop.to_arrow(masked.obj))

assert expect == got
Loading

0 comments on commit 4fc8e79

Please sign in to comment.