Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Patch dask-expr var logic in dask-cudf #15347

Merged
merged 21 commits into from
Apr 8, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions ci/build_docs.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,6 @@ mkdir -p "${RAPIDS_DOCS_DIR}/libcudf/html"
mv html/* "${RAPIDS_DOCS_DIR}/libcudf/html"
popd

# TODO: Remove this once dask-expr works in the 10min notebook
export DASK_DATAFRAME__QUERY_PLANNING=False

rapids-logger "Build Python docs"
pushd docs/cudf
make dirhtml
Expand Down
10 changes: 5 additions & 5 deletions python/dask_cudf/dask_cudf/expr/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
QUERY_PLANNING_ON = config.get("dataframe.query-planning", None) is not False

# Register custom expressions and collections
try:
import dask_cudf.expr._collection
import dask_cudf.expr._expr
if QUERY_PLANNING_ON:
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
try:
import dask_cudf.expr._collection
import dask_cudf.expr._expr

except ImportError as err:
if QUERY_PLANNING_ON:
except ImportError as err:
# Dask *should* raise an error before this.
# However, we can still raise here to be certain.
raise RuntimeError(
Expand Down
106 changes: 93 additions & 13 deletions python/dask_cudf/dask_cudf/expr/_expr.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
# Copyright (c) 2024, NVIDIA CORPORATION.
import functools

from dask_expr._cumulative import CumulativeBlockwise
from dask_expr._reductions import Var
from dask_expr._expr import Expr, VarColumns
from dask_expr._reductions import Reduction, Var

from dask.dataframe.core import is_dataframe_like, make_meta, meta_nonempty

##
## Custom expression patching
Expand All @@ -25,19 +29,95 @@ def _kwargs(self) -> dict:
CumulativeBlockwise._kwargs = PatchCumulativeBlockwise._kwargs


# This patch accounts for differences between
# numpy and cupy behavior. It may make sense
# to move this logic upstream.
_dx_reduction_aggregate = Var.reduction_aggregate
# The upstream Var code uses `Series.values`, and relies on numpy
# for most of the logic. Unfortunately, cudf -> cupy conversion
# is not supported for data containing null values. Therefore,
# we must implement our own version of Var for now. This logic
# is mostly copied from dask-cudf.


class VarCudf(Reduction):
# Uses the parallel version of Welford's online algorithm (Chan 79')
rjzamora marked this conversation as resolved.
Show resolved Hide resolved
# (http://i.stanford.edu/pub/cstr/reports/cs/tr/79/773/CS-TR-79-773.pdf)
_parameters = ["frame", "skipna", "ddof", "numeric_only", "split_every"]
_defaults = {
"skipna": True,
"ddof": 1,
"numeric_only": False,
"split_every": False,
}

@functools.cached_property
def _meta(self):
return make_meta(
meta_nonempty(self.frame._meta).var(
skipna=self.skipna, numeric_only=self.numeric_only
)
)

@property
def chunk_kwargs(self):
return dict(skipna=self.skipna, numeric_only=self.numeric_only)

@property
def combine_kwargs(self):
return {}

@property
def aggregate_kwargs(self):
return dict(ddof=self.ddof)

@classmethod
def reduction_chunk(cls, x, skipna=True, numeric_only=False):
kwargs = {"numeric_only": numeric_only} if is_dataframe_like(x) else {}
if skipna or numeric_only:
n = x.count(**kwargs)
kwargs["skipna"] = skipna
avg = x.mean(**kwargs)
else:
# Not skipping nulls, so might as well
# avoid the full `count` operation
n = len(x)
kwargs["skipna"] = skipna
avg = x.sum(**kwargs) / n
if numeric_only:
# Workaround for cudf bug
# (see: https://github.com/rapidsai/cudf/issues/13731)
x = x[n.index]
m2 = ((x - avg) ** 2).sum(**kwargs)
return n, avg, m2

@classmethod
def reduction_combine(cls, parts):
n, avg, m2 = parts[0]
for i in range(1, len(parts)):
n_a, avg_a, m2_a = n, avg, m2
n_b, avg_b, m2_b = parts[i]
n = n_a + n_b
avg = (n_a * avg_a + n_b * avg_b) / n
delta = avg_b - avg_a
m2 = m2_a + m2_b + delta**2 * n_a * n_b / n
return n, avg, m2

@classmethod
def reduction_aggregate(cls, vals, ddof=1):
vals = cls.reduction_combine(vals)
n, _, m2 = vals
return m2 / (n - ddof)


def _reduction_aggregate(*args, **kwargs):
result = _dx_reduction_aggregate(*args, **kwargs)
if result.ndim == 0:
# cupy will sometimes produce a 0d array, and
# we need to convert it to a scalar.
return result.item()
return result
def _patched_var(
self, axis=0, skipna=True, ddof=1, numeric_only=False, split_every=False
):
if axis == 0:
if hasattr(self._meta, "to_pandas"):
return VarCudf(self, skipna, ddof, numeric_only, split_every)
else:
return Var(self, skipna, ddof, numeric_only, split_every)
elif axis == 1:
return VarColumns(self, skipna, ddof, numeric_only)
else:
raise ValueError(f"axis={axis} not supported. Please specify 0 or 1")


Var.reduction_aggregate = staticmethod(_reduction_aggregate)
Expr.var = _patched_var
Loading